Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 7dd30006

History | View | Annotate | Download (141.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46
class LogicalUnit(object):
47
  """Logical Unit base class.
48

49
  Subclasses must follow these rules:
50
    - implement CheckPrereq which also fills in the opcode instance
51
      with all the fields (even if as None)
52
    - implement Exec
53
    - implement BuildHooksEnv
54
    - redefine HPATH and HTYPE
55
    - optionally redefine their run requirements (REQ_CLUSTER,
56
      REQ_MASTER); note that all commands require root permissions
57

58
  """
59
  HPATH = None
60
  HTYPE = None
61
  _OP_REQP = []
62
  REQ_CLUSTER = True
63
  REQ_MASTER = True
64

    
65
  def __init__(self, processor, op, cfg, sstore):
66
    """Constructor for LogicalUnit.
67

68
    This needs to be overriden in derived classes in order to check op
69
    validity.
70

71
    """
72
    self.processor = processor
73
    self.op = op
74
    self.cfg = cfg
75
    self.sstore = sstore
76
    for attr_name in self._OP_REQP:
77
      attr_val = getattr(op, attr_name, None)
78
      if attr_val is None:
79
        raise errors.OpPrereqError("Required parameter '%s' missing" %
80
                                   attr_name)
81
    if self.REQ_CLUSTER:
82
      if not cfg.IsCluster():
83
        raise errors.OpPrereqError("Cluster not initialized yet,"
84
                                   " use 'gnt-cluster init' first.")
85
      if self.REQ_MASTER:
86
        master = sstore.GetMasterNode()
87
        if master != utils.HostInfo().name:
88
          raise errors.OpPrereqError("Commands must be run on the master"
89
                                     " node %s" % master)
90

    
91
  def CheckPrereq(self):
92
    """Check prerequisites for this LU.
93

94
    This method should check that the prerequisites for the execution
95
    of this LU are fulfilled. It can do internode communication, but
96
    it should be idempotent - no cluster or system changes are
97
    allowed.
98

99
    The method should raise errors.OpPrereqError in case something is
100
    not fulfilled. Its return value is ignored.
101

102
    This method should also update all the parameters of the opcode to
103
    their canonical form; e.g. a short node name must be fully
104
    expanded after this method has successfully completed (so that
105
    hooks, logging, etc. work correctly).
106

107
    """
108
    raise NotImplementedError
109

    
110
  def Exec(self, feedback_fn):
111
    """Execute the LU.
112

113
    This method should implement the actual work. It should raise
114
    errors.OpExecError for failures that are somewhat dealt with in
115
    code, or expected.
116

117
    """
118
    raise NotImplementedError
119

    
120
  def BuildHooksEnv(self):
121
    """Build hooks environment for this LU.
122

123
    This method should return a three-node tuple consisting of: a dict
124
    containing the environment that will be used for running the
125
    specific hook for this LU, a list of node names on which the hook
126
    should run before the execution, and a list of node names on which
127
    the hook should run after the execution.
128

129
    The keys of the dict must not have 'GANETI_' prefixed as this will
130
    be handled in the hooks runner. Also note additional keys will be
131
    added by the hooks runner. If the LU doesn't define any
132
    environment, an empty dict (and not None) should be returned.
133

134
    As for the node lists, the master should not be included in the
135
    them, as it will be added by the hooks runner in case this LU
136
    requires a cluster to run on (otherwise we don't have a node
137
    list). No nodes should be returned as an empty list (and not
138
    None).
139

140
    Note that if the HPATH for a LU class is None, this function will
141
    not be called.
142

143
    """
144
    raise NotImplementedError
145

    
146

    
147
class NoHooksLU(LogicalUnit):
148
  """Simple LU which runs no hooks.
149

150
  This LU is intended as a parent for other LogicalUnits which will
151
  run no hooks, in order to reduce duplicate code.
152

153
  """
154
  HPATH = None
155
  HTYPE = None
156

    
157
  def BuildHooksEnv(self):
158
    """Build hooks env.
159

160
    This is a no-op, since we don't run hooks.
161

162
    """
163
    return {}, [], []
164

    
165

    
166
def _GetWantedNodes(lu, nodes):
167
  """Returns list of checked and expanded node names.
168

169
  Args:
170
    nodes: List of nodes (strings) or None for all
171

172
  """
173
  if not isinstance(nodes, list):
174
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
175

    
176
  if nodes:
177
    wanted = []
178

    
179
    for name in nodes:
180
      node = lu.cfg.ExpandNodeName(name)
181
      if node is None:
182
        raise errors.OpPrereqError("No such node name '%s'" % name)
183
      wanted.append(node)
184

    
185
  else:
186
    wanted = lu.cfg.GetNodeList()
187
  return utils.NiceSort(wanted)
188

    
189

    
190
def _GetWantedInstances(lu, instances):
191
  """Returns list of checked and expanded instance names.
192

193
  Args:
194
    instances: List of instances (strings) or None for all
195

196
  """
197
  if not isinstance(instances, list):
198
    raise errors.OpPrereqError("Invalid argument type 'instances'")
199

    
200
  if instances:
201
    wanted = []
202

    
203
    for name in instances:
204
      instance = lu.cfg.ExpandInstanceName(name)
205
      if instance is None:
206
        raise errors.OpPrereqError("No such instance name '%s'" % name)
207
      wanted.append(instance)
208

    
209
  else:
210
    wanted = lu.cfg.GetInstanceList()
211
  return utils.NiceSort(wanted)
212

    
213

    
214
def _CheckOutputFields(static, dynamic, selected):
215
  """Checks whether all selected fields are valid.
216

217
  Args:
218
    static: Static fields
219
    dynamic: Dynamic fields
220

221
  """
222
  static_fields = frozenset(static)
223
  dynamic_fields = frozenset(dynamic)
224

    
225
  all_fields = static_fields | dynamic_fields
226

    
227
  if not all_fields.issuperset(selected):
228
    raise errors.OpPrereqError("Unknown output fields selected: %s"
229
                               % ",".join(frozenset(selected).
230
                                          difference(all_fields)))
231

    
232

    
233
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
234
                          memory, vcpus, nics):
235
  """Builds instance related env variables for hooks from single variables.
236

237
  Args:
238
    secondary_nodes: List of secondary nodes as strings
239
  """
240
  env = {
241
    "OP_TARGET": name,
242
    "INSTANCE_NAME": name,
243
    "INSTANCE_PRIMARY": primary_node,
244
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
245
    "INSTANCE_OS_TYPE": os_type,
246
    "INSTANCE_STATUS": status,
247
    "INSTANCE_MEMORY": memory,
248
    "INSTANCE_VCPUS": vcpus,
249
  }
250

    
251
  if nics:
252
    nic_count = len(nics)
253
    for idx, (ip, bridge) in enumerate(nics):
254
      if ip is None:
255
        ip = ""
256
      env["INSTANCE_NIC%d_IP" % idx] = ip
257
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
258
  else:
259
    nic_count = 0
260

    
261
  env["INSTANCE_NIC_COUNT"] = nic_count
262

    
263
  return env
264

    
265

    
266
def _BuildInstanceHookEnvByObject(instance, override=None):
267
  """Builds instance related env variables for hooks from an object.
268

269
  Args:
270
    instance: objects.Instance object of instance
271
    override: dict of values to override
272
  """
273
  args = {
274
    'name': instance.name,
275
    'primary_node': instance.primary_node,
276
    'secondary_nodes': instance.secondary_nodes,
277
    'os_type': instance.os,
278
    'status': instance.os,
279
    'memory': instance.memory,
280
    'vcpus': instance.vcpus,
281
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
282
  }
283
  if override:
284
    args.update(override)
285
  return _BuildInstanceHookEnv(**args)
286

    
287

    
288
def _UpdateEtcHosts(fullnode, ip):
289
  """Ensure a node has a correct entry in /etc/hosts.
290

291
  Args:
292
    fullnode - Fully qualified domain name of host. (str)
293
    ip       - IPv4 address of host (str)
294

295
  """
296
  node = fullnode.split(".", 1)[0]
297

    
298
  f = open('/etc/hosts', 'r+')
299

    
300
  inthere = False
301

    
302
  save_lines = []
303
  add_lines = []
304
  removed = False
305

    
306
  while True:
307
    rawline = f.readline()
308

    
309
    if not rawline:
310
      # End of file
311
      break
312

    
313
    line = rawline.split('\n')[0]
314

    
315
    # Strip off comments
316
    line = line.split('#')[0]
317

    
318
    if not line:
319
      # Entire line was comment, skip
320
      save_lines.append(rawline)
321
      continue
322

    
323
    fields = line.split()
324

    
325
    haveall = True
326
    havesome = False
327
    for spec in [ ip, fullnode, node ]:
328
      if spec not in fields:
329
        haveall = False
330
      if spec in fields:
331
        havesome = True
332

    
333
    if haveall:
334
      inthere = True
335
      save_lines.append(rawline)
336
      continue
337

    
338
    if havesome and not haveall:
339
      # Line (old, or manual?) which is missing some.  Remove.
340
      removed = True
341
      continue
342

    
343
    save_lines.append(rawline)
344

    
345
  if not inthere:
346
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
347

    
348
  if removed:
349
    if add_lines:
350
      save_lines = save_lines + add_lines
351

    
352
    # We removed a line, write a new file and replace old.
353
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
354
    newfile = os.fdopen(fd, 'w')
355
    newfile.write(''.join(save_lines))
356
    newfile.close()
357
    os.rename(tmpname, '/etc/hosts')
358

    
359
  elif add_lines:
360
    # Simply appending a new line will do the trick.
361
    f.seek(0, 2)
362
    for add in add_lines:
363
      f.write(add)
364

    
365
  f.close()
366

    
367

    
368
def _UpdateKnownHosts(fullnode, ip, pubkey):
369
  """Ensure a node has a correct known_hosts entry.
370

371
  Args:
372
    fullnode - Fully qualified domain name of host. (str)
373
    ip       - IPv4 address of host (str)
374
    pubkey   - the public key of the cluster
375

376
  """
377
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
378
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
379
  else:
380
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
381

    
382
  inthere = False
383

    
384
  save_lines = []
385
  add_lines = []
386
  removed = False
387

    
388
  for rawline in f:
389
    logger.Debug('read %s' % (repr(rawline),))
390

    
391
    parts = rawline.rstrip('\r\n').split()
392

    
393
    # Ignore unwanted lines
394
    if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
395
      fields = parts[0].split(',')
396
      key = parts[2]
397

    
398
      haveall = True
399
      havesome = False
400
      for spec in [ ip, fullnode ]:
401
        if spec not in fields:
402
          haveall = False
403
        if spec in fields:
404
          havesome = True
405

    
406
      logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
407
      if haveall and key == pubkey:
408
        inthere = True
409
        save_lines.append(rawline)
410
        logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
411
        continue
412

    
413
      if havesome and (not haveall or key != pubkey):
414
        removed = True
415
        logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
416
        continue
417

    
418
    save_lines.append(rawline)
419

    
420
  if not inthere:
421
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
422
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
423

    
424
  if removed:
425
    save_lines = save_lines + add_lines
426

    
427
    # Write a new file and replace old.
428
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
429
                                   constants.DATA_DIR)
430
    newfile = os.fdopen(fd, 'w')
431
    try:
432
      newfile.write(''.join(save_lines))
433
    finally:
434
      newfile.close()
435
    logger.Debug("Wrote new known_hosts.")
436
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
437

    
438
  elif add_lines:
439
    # Simply appending a new line will do the trick.
440
    f.seek(0, 2)
441
    for add in add_lines:
442
      f.write(add)
443

    
444
  f.close()
445

    
446

    
447
def _HasValidVG(vglist, vgname):
448
  """Checks if the volume group list is valid.
449

450
  A non-None return value means there's an error, and the return value
451
  is the error message.
452

453
  """
454
  vgsize = vglist.get(vgname, None)
455
  if vgsize is None:
456
    return "volume group '%s' missing" % vgname
457
  elif vgsize < 20480:
458
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
459
            (vgname, vgsize))
460
  return None
461

    
462

    
463
def _InitSSHSetup(node):
464
  """Setup the SSH configuration for the cluster.
465

466

467
  This generates a dsa keypair for root, adds the pub key to the
468
  permitted hosts and adds the hostkey to its own known hosts.
469

470
  Args:
471
    node: the name of this host as a fqdn
472

473
  """
474
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
475

    
476
  for name in priv_key, pub_key:
477
    if os.path.exists(name):
478
      utils.CreateBackup(name)
479
    utils.RemoveFile(name)
480

    
481
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
482
                         "-f", priv_key,
483
                         "-q", "-N", ""])
484
  if result.failed:
485
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
486
                             result.output)
487

    
488
  f = open(pub_key, 'r')
489
  try:
490
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
491
  finally:
492
    f.close()
493

    
494

    
495
def _InitGanetiServerSetup(ss):
496
  """Setup the necessary configuration for the initial node daemon.
497

498
  This creates the nodepass file containing the shared password for
499
  the cluster and also generates the SSL certificate.
500

501
  """
502
  # Create pseudo random password
503
  randpass = sha.new(os.urandom(64)).hexdigest()
504
  # and write it into sstore
505
  ss.SetKey(ss.SS_NODED_PASS, randpass)
506

    
507
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
508
                         "-days", str(365*5), "-nodes", "-x509",
509
                         "-keyout", constants.SSL_CERT_FILE,
510
                         "-out", constants.SSL_CERT_FILE, "-batch"])
511
  if result.failed:
512
    raise errors.OpExecError("could not generate server ssl cert, command"
513
                             " %s had exitcode %s and error message %s" %
514
                             (result.cmd, result.exit_code, result.output))
515

    
516
  os.chmod(constants.SSL_CERT_FILE, 0400)
517

    
518
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
519

    
520
  if result.failed:
521
    raise errors.OpExecError("Could not start the node daemon, command %s"
522
                             " had exitcode %s and error %s" %
523
                             (result.cmd, result.exit_code, result.output))
524

    
525

    
526
def _CheckInstanceBridgesExist(instance):
527
  """Check that the brigdes needed by an instance exist.
528

529
  """
530
  # check bridges existance
531
  brlist = [nic.bridge for nic in instance.nics]
532
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
533
    raise errors.OpPrereqError("one or more target bridges %s does not"
534
                               " exist on destination node '%s'" %
535
                               (brlist, instance.primary_node))
536

    
537

    
538
class LUInitCluster(LogicalUnit):
539
  """Initialise the cluster.
540

541
  """
542
  HPATH = "cluster-init"
543
  HTYPE = constants.HTYPE_CLUSTER
544
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
545
              "def_bridge", "master_netdev"]
546
  REQ_CLUSTER = False
547

    
548
  def BuildHooksEnv(self):
549
    """Build hooks env.
550

551
    Notes: Since we don't require a cluster, we must manually add
552
    ourselves in the post-run node list.
553

554
    """
555
    env = {"OP_TARGET": self.op.cluster_name}
556
    return env, [], [self.hostname.name]
557

    
558
  def CheckPrereq(self):
559
    """Verify that the passed name is a valid one.
560

561
    """
562
    if config.ConfigWriter.IsCluster():
563
      raise errors.OpPrereqError("Cluster is already initialised")
564

    
565
    self.hostname = hostname = utils.HostInfo()
566

    
567
    if hostname.ip.startswith("127."):
568
      raise errors.OpPrereqError("This host's IP resolves to the private"
569
                                 " range (%s). Please fix DNS or /etc/hosts." %
570
                                 (hostname.ip,))
571

    
572
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
573

    
574
    if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
575
                         constants.DEFAULT_NODED_PORT):
576
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
577
                                 " to %s,\nbut this ip address does not"
578
                                 " belong to this host."
579
                                 " Aborting." % hostname.ip)
580

    
581
    secondary_ip = getattr(self.op, "secondary_ip", None)
582
    if secondary_ip and not utils.IsValidIP(secondary_ip):
583
      raise errors.OpPrereqError("Invalid secondary ip given")
584
    if (secondary_ip and
585
        secondary_ip != hostname.ip and
586
        (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
587
                           constants.DEFAULT_NODED_PORT))):
588
      raise errors.OpPrereqError("You gave %s as secondary IP,\n"
589
                                 "but it does not belong to this host." %
590
                                 secondary_ip)
591
    self.secondary_ip = secondary_ip
592

    
593
    # checks presence of the volume group given
594
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
595

    
596
    if vgstatus:
597
      raise errors.OpPrereqError("Error: %s" % vgstatus)
598

    
599
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
600
                    self.op.mac_prefix):
601
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
602
                                 self.op.mac_prefix)
603

    
604
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
605
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
606
                                 self.op.hypervisor_type)
607

    
608
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
609
    if result.failed:
610
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
611
                                 (self.op.master_netdev,
612
                                  result.output.strip()))
613

    
614
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
615
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
616
      raise errors.OpPrereqError("Init.d script '%s' missing or not "
617
                                 "executable." % constants.NODE_INITD_SCRIPT)
618

    
619
  def Exec(self, feedback_fn):
620
    """Initialize the cluster.
621

622
    """
623
    clustername = self.clustername
624
    hostname = self.hostname
625

    
626
    # set up the simple store
627
    self.sstore = ss = ssconf.SimpleStore()
628
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
629
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
630
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
631
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
632
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
633

    
634
    # set up the inter-node password and certificate
635
    _InitGanetiServerSetup(ss)
636

    
637
    # start the master ip
638
    rpc.call_node_start_master(hostname.name)
639

    
640
    # set up ssh config and /etc/hosts
641
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
642
    try:
643
      sshline = f.read()
644
    finally:
645
      f.close()
646
    sshkey = sshline.split(" ")[1]
647

    
648
    _UpdateEtcHosts(hostname.name, hostname.ip)
649

    
650
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
651

    
652
    _InitSSHSetup(hostname.name)
653

    
654
    # init of cluster config file
655
    self.cfg = cfgw = config.ConfigWriter()
656
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
657
                    sshkey, self.op.mac_prefix,
658
                    self.op.vg_name, self.op.def_bridge)
659

    
660

    
661
class LUDestroyCluster(NoHooksLU):
662
  """Logical unit for destroying the cluster.
663

664
  """
665
  _OP_REQP = []
666

    
667
  def CheckPrereq(self):
668
    """Check prerequisites.
669

670
    This checks whether the cluster is empty.
671

672
    Any errors are signalled by raising errors.OpPrereqError.
673

674
    """
675
    master = self.sstore.GetMasterNode()
676

    
677
    nodelist = self.cfg.GetNodeList()
678
    if len(nodelist) != 1 or nodelist[0] != master:
679
      raise errors.OpPrereqError("There are still %d node(s) in"
680
                                 " this cluster." % (len(nodelist) - 1))
681
    instancelist = self.cfg.GetInstanceList()
682
    if instancelist:
683
      raise errors.OpPrereqError("There are still %d instance(s) in"
684
                                 " this cluster." % len(instancelist))
685

    
686
  def Exec(self, feedback_fn):
687
    """Destroys the cluster.
688

689
    """
690
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
691
    utils.CreateBackup(priv_key)
692
    utils.CreateBackup(pub_key)
693
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
694

    
695

    
696
class LUVerifyCluster(NoHooksLU):
697
  """Verifies the cluster status.
698

699
  """
700
  _OP_REQP = []
701

    
702
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
703
                  remote_version, feedback_fn):
704
    """Run multiple tests against a node.
705

706
    Test list:
707
      - compares ganeti version
708
      - checks vg existance and size > 20G
709
      - checks config file checksum
710
      - checks ssh to other nodes
711

712
    Args:
713
      node: name of the node to check
714
      file_list: required list of files
715
      local_cksum: dictionary of local files and their checksums
716

717
    """
718
    # compares ganeti version
719
    local_version = constants.PROTOCOL_VERSION
720
    if not remote_version:
721
      feedback_fn(" - ERROR: connection to %s failed" % (node))
722
      return True
723

    
724
    if local_version != remote_version:
725
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
726
                      (local_version, node, remote_version))
727
      return True
728

    
729
    # checks vg existance and size > 20G
730

    
731
    bad = False
732
    if not vglist:
733
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
734
                      (node,))
735
      bad = True
736
    else:
737
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
738
      if vgstatus:
739
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
740
        bad = True
741

    
742
    # checks config file checksum
743
    # checks ssh to any
744

    
745
    if 'filelist' not in node_result:
746
      bad = True
747
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
748
    else:
749
      remote_cksum = node_result['filelist']
750
      for file_name in file_list:
751
        if file_name not in remote_cksum:
752
          bad = True
753
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
754
        elif remote_cksum[file_name] != local_cksum[file_name]:
755
          bad = True
756
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
757

    
758
    if 'nodelist' not in node_result:
759
      bad = True
760
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
761
    else:
762
      if node_result['nodelist']:
763
        bad = True
764
        for node in node_result['nodelist']:
765
          feedback_fn("  - ERROR: communication with node '%s': %s" %
766
                          (node, node_result['nodelist'][node]))
767
    hyp_result = node_result.get('hypervisor', None)
768
    if hyp_result is not None:
769
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
770
    return bad
771

    
772
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
773
    """Verify an instance.
774

775
    This function checks to see if the required block devices are
776
    available on the instance's node.
777

778
    """
779
    bad = False
780

    
781
    instancelist = self.cfg.GetInstanceList()
782
    if not instance in instancelist:
783
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
784
                      (instance, instancelist))
785
      bad = True
786

    
787
    instanceconfig = self.cfg.GetInstanceInfo(instance)
788
    node_current = instanceconfig.primary_node
789

    
790
    node_vol_should = {}
791
    instanceconfig.MapLVsByNode(node_vol_should)
792

    
793
    for node in node_vol_should:
794
      for volume in node_vol_should[node]:
795
        if node not in node_vol_is or volume not in node_vol_is[node]:
796
          feedback_fn("  - ERROR: volume %s missing on node %s" %
797
                          (volume, node))
798
          bad = True
799

    
800
    if not instanceconfig.status == 'down':
801
      if not instance in node_instance[node_current]:
802
        feedback_fn("  - ERROR: instance %s not running on node %s" %
803
                        (instance, node_current))
804
        bad = True
805

    
806
    for node in node_instance:
807
      if (not node == node_current):
808
        if instance in node_instance[node]:
809
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
810
                          (instance, node))
811
          bad = True
812

    
813
    return bad
814

    
815
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
816
    """Verify if there are any unknown volumes in the cluster.
817

818
    The .os, .swap and backup volumes are ignored. All other volumes are
819
    reported as unknown.
820

821
    """
822
    bad = False
823

    
824
    for node in node_vol_is:
825
      for volume in node_vol_is[node]:
826
        if node not in node_vol_should or volume not in node_vol_should[node]:
827
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
828
                      (volume, node))
829
          bad = True
830
    return bad
831

    
832
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
833
    """Verify the list of running instances.
834

835
    This checks what instances are running but unknown to the cluster.
836

837
    """
838
    bad = False
839
    for node in node_instance:
840
      for runninginstance in node_instance[node]:
841
        if runninginstance not in instancelist:
842
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
843
                          (runninginstance, node))
844
          bad = True
845
    return bad
846

    
847
  def CheckPrereq(self):
848
    """Check prerequisites.
849

850
    This has no prerequisites.
851

852
    """
853
    pass
854

    
855
  def Exec(self, feedback_fn):
856
    """Verify integrity of cluster, performing various test on nodes.
857

858
    """
859
    bad = False
860
    feedback_fn("* Verifying global settings")
861
    self.cfg.VerifyConfig()
862

    
863
    vg_name = self.cfg.GetVGName()
864
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
865
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
866
    node_volume = {}
867
    node_instance = {}
868

    
869
    # FIXME: verify OS list
870
    # do local checksums
871
    file_names = list(self.sstore.GetFileList())
872
    file_names.append(constants.SSL_CERT_FILE)
873
    file_names.append(constants.CLUSTER_CONF_FILE)
874
    local_checksums = utils.FingerprintFiles(file_names)
875

    
876
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
877
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
878
    all_instanceinfo = rpc.call_instance_list(nodelist)
879
    all_vglist = rpc.call_vg_list(nodelist)
880
    node_verify_param = {
881
      'filelist': file_names,
882
      'nodelist': nodelist,
883
      'hypervisor': None,
884
      }
885
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
886
    all_rversion = rpc.call_version(nodelist)
887

    
888
    for node in nodelist:
889
      feedback_fn("* Verifying node %s" % node)
890
      result = self._VerifyNode(node, file_names, local_checksums,
891
                                all_vglist[node], all_nvinfo[node],
892
                                all_rversion[node], feedback_fn)
893
      bad = bad or result
894

    
895
      # node_volume
896
      volumeinfo = all_volumeinfo[node]
897

    
898
      if type(volumeinfo) != dict:
899
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
900
        bad = True
901
        continue
902

    
903
      node_volume[node] = volumeinfo
904

    
905
      # node_instance
906
      nodeinstance = all_instanceinfo[node]
907
      if type(nodeinstance) != list:
908
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
909
        bad = True
910
        continue
911

    
912
      node_instance[node] = nodeinstance
913

    
914
    node_vol_should = {}
915

    
916
    for instance in instancelist:
917
      feedback_fn("* Verifying instance %s" % instance)
918
      result =  self._VerifyInstance(instance, node_volume, node_instance,
919
                                     feedback_fn)
920
      bad = bad or result
921

    
922
      inst_config = self.cfg.GetInstanceInfo(instance)
923

    
924
      inst_config.MapLVsByNode(node_vol_should)
925

    
926
    feedback_fn("* Verifying orphan volumes")
927
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
928
                                       feedback_fn)
929
    bad = bad or result
930

    
931
    feedback_fn("* Verifying remaining instances")
932
    result = self._VerifyOrphanInstances(instancelist, node_instance,
933
                                         feedback_fn)
934
    bad = bad or result
935

    
936
    return int(bad)
937

    
938

    
939
class LURenameCluster(LogicalUnit):
940
  """Rename the cluster.
941

942
  """
943
  HPATH = "cluster-rename"
944
  HTYPE = constants.HTYPE_CLUSTER
945
  _OP_REQP = ["name"]
946

    
947
  def BuildHooksEnv(self):
948
    """Build hooks env.
949

950
    """
951
    env = {
952
      "OP_TARGET": self.op.sstore.GetClusterName(),
953
      "NEW_NAME": self.op.name,
954
      }
955
    mn = self.sstore.GetMasterNode()
956
    return env, [mn], [mn]
957

    
958
  def CheckPrereq(self):
959
    """Verify that the passed name is a valid one.
960

961
    """
962
    hostname = utils.HostInfo(self.op.name)
963

    
964
    new_name = hostname.name
965
    self.ip = new_ip = hostname.ip
966
    old_name = self.sstore.GetClusterName()
967
    old_ip = self.sstore.GetMasterIP()
968
    if new_name == old_name and new_ip == old_ip:
969
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
970
                                 " cluster has changed")
971
    if new_ip != old_ip:
972
      result = utils.RunCmd(["fping", "-q", new_ip])
973
      if not result.failed:
974
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
975
                                   " reachable on the network. Aborting." %
976
                                   new_ip)
977

    
978
    self.op.name = new_name
979

    
980
  def Exec(self, feedback_fn):
981
    """Rename the cluster.
982

983
    """
984
    clustername = self.op.name
985
    ip = self.ip
986
    ss = self.sstore
987

    
988
    # shutdown the master IP
989
    master = ss.GetMasterNode()
990
    if not rpc.call_node_stop_master(master):
991
      raise errors.OpExecError("Could not disable the master role")
992

    
993
    try:
994
      # modify the sstore
995
      ss.SetKey(ss.SS_MASTER_IP, ip)
996
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
997

    
998
      # Distribute updated ss config to all nodes
999
      myself = self.cfg.GetNodeInfo(master)
1000
      dist_nodes = self.cfg.GetNodeList()
1001
      if myself.name in dist_nodes:
1002
        dist_nodes.remove(myself.name)
1003

    
1004
      logger.Debug("Copying updated ssconf data to all nodes")
1005
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1006
        fname = ss.KeyToFilename(keyname)
1007
        result = rpc.call_upload_file(dist_nodes, fname)
1008
        for to_node in dist_nodes:
1009
          if not result[to_node]:
1010
            logger.Error("copy of file %s to node %s failed" %
1011
                         (fname, to_node))
1012
    finally:
1013
      if not rpc.call_node_start_master(master):
1014
        logger.Error("Could not re-enable the master role on the master,\n"
1015
                     "please restart manually.")
1016

    
1017

    
1018
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
1019
  """Sleep and poll for an instance's disk to sync.
1020

1021
  """
1022
  if not instance.disks:
1023
    return True
1024

    
1025
  if not oneshot:
1026
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
1027

    
1028
  node = instance.primary_node
1029

    
1030
  for dev in instance.disks:
1031
    cfgw.SetDiskID(dev, node)
1032

    
1033
  retries = 0
1034
  while True:
1035
    max_time = 0
1036
    done = True
1037
    cumul_degraded = False
1038
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1039
    if not rstats:
1040
      logger.ToStderr("Can't get any data from node %s" % node)
1041
      retries += 1
1042
      if retries >= 10:
1043
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1044
                                 " aborting." % node)
1045
      time.sleep(6)
1046
      continue
1047
    retries = 0
1048
    for i in range(len(rstats)):
1049
      mstat = rstats[i]
1050
      if mstat is None:
1051
        logger.ToStderr("Can't compute data for node %s/%s" %
1052
                        (node, instance.disks[i].iv_name))
1053
        continue
1054
      perc_done, est_time, is_degraded = mstat
1055
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1056
      if perc_done is not None:
1057
        done = False
1058
        if est_time is not None:
1059
          rem_time = "%d estimated seconds remaining" % est_time
1060
          max_time = est_time
1061
        else:
1062
          rem_time = "no time estimate"
1063
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
1064
                        (instance.disks[i].iv_name, perc_done, rem_time))
1065
    if done or oneshot:
1066
      break
1067

    
1068
    if unlock:
1069
      utils.Unlock('cmd')
1070
    try:
1071
      time.sleep(min(60, max_time))
1072
    finally:
1073
      if unlock:
1074
        utils.Lock('cmd')
1075

    
1076
  if done:
1077
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1078
  return not cumul_degraded
1079

    
1080

    
1081
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1082
  """Check that mirrors are not degraded.
1083

1084
  """
1085
  cfgw.SetDiskID(dev, node)
1086

    
1087
  result = True
1088
  if on_primary or dev.AssembleOnSecondary():
1089
    rstats = rpc.call_blockdev_find(node, dev)
1090
    if not rstats:
1091
      logger.ToStderr("Can't get any data from node %s" % node)
1092
      result = False
1093
    else:
1094
      result = result and (not rstats[5])
1095
  if dev.children:
1096
    for child in dev.children:
1097
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1098

    
1099
  return result
1100

    
1101

    
1102
class LUDiagnoseOS(NoHooksLU):
1103
  """Logical unit for OS diagnose/query.
1104

1105
  """
1106
  _OP_REQP = []
1107

    
1108
  def CheckPrereq(self):
1109
    """Check prerequisites.
1110

1111
    This always succeeds, since this is a pure query LU.
1112

1113
    """
1114
    return
1115

    
1116
  def Exec(self, feedback_fn):
1117
    """Compute the list of OSes.
1118

1119
    """
1120
    node_list = self.cfg.GetNodeList()
1121
    node_data = rpc.call_os_diagnose(node_list)
1122
    if node_data == False:
1123
      raise errors.OpExecError("Can't gather the list of OSes")
1124
    return node_data
1125

    
1126

    
1127
class LURemoveNode(LogicalUnit):
1128
  """Logical unit for removing a node.
1129

1130
  """
1131
  HPATH = "node-remove"
1132
  HTYPE = constants.HTYPE_NODE
1133
  _OP_REQP = ["node_name"]
1134

    
1135
  def BuildHooksEnv(self):
1136
    """Build hooks env.
1137

1138
    This doesn't run on the target node in the pre phase as a failed
1139
    node would not allows itself to run.
1140

1141
    """
1142
    env = {
1143
      "OP_TARGET": self.op.node_name,
1144
      "NODE_NAME": self.op.node_name,
1145
      }
1146
    all_nodes = self.cfg.GetNodeList()
1147
    all_nodes.remove(self.op.node_name)
1148
    return env, all_nodes, all_nodes
1149

    
1150
  def CheckPrereq(self):
1151
    """Check prerequisites.
1152

1153
    This checks:
1154
     - the node exists in the configuration
1155
     - it does not have primary or secondary instances
1156
     - it's not the master
1157

1158
    Any errors are signalled by raising errors.OpPrereqError.
1159

1160
    """
1161
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1162
    if node is None:
1163
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1164

    
1165
    instance_list = self.cfg.GetInstanceList()
1166

    
1167
    masternode = self.sstore.GetMasterNode()
1168
    if node.name == masternode:
1169
      raise errors.OpPrereqError("Node is the master node,"
1170
                                 " you need to failover first.")
1171

    
1172
    for instance_name in instance_list:
1173
      instance = self.cfg.GetInstanceInfo(instance_name)
1174
      if node.name == instance.primary_node:
1175
        raise errors.OpPrereqError("Instance %s still running on the node,"
1176
                                   " please remove first." % instance_name)
1177
      if node.name in instance.secondary_nodes:
1178
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1179
                                   " please remove first." % instance_name)
1180
    self.op.node_name = node.name
1181
    self.node = node
1182

    
1183
  def Exec(self, feedback_fn):
1184
    """Removes the node from the cluster.
1185

1186
    """
1187
    node = self.node
1188
    logger.Info("stopping the node daemon and removing configs from node %s" %
1189
                node.name)
1190

    
1191
    rpc.call_node_leave_cluster(node.name)
1192

    
1193
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1194

    
1195
    logger.Info("Removing node %s from config" % node.name)
1196

    
1197
    self.cfg.RemoveNode(node.name)
1198

    
1199

    
1200
class LUQueryNodes(NoHooksLU):
1201
  """Logical unit for querying nodes.
1202

1203
  """
1204
  _OP_REQP = ["output_fields", "names"]
1205

    
1206
  def CheckPrereq(self):
1207
    """Check prerequisites.
1208

1209
    This checks that the fields required are valid output fields.
1210

1211
    """
1212
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1213
                                     "mtotal", "mnode", "mfree",
1214
                                     "bootid"])
1215

    
1216
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1217
                               "pinst_list", "sinst_list",
1218
                               "pip", "sip"],
1219
                       dynamic=self.dynamic_fields,
1220
                       selected=self.op.output_fields)
1221

    
1222
    self.wanted = _GetWantedNodes(self, self.op.names)
1223

    
1224
  def Exec(self, feedback_fn):
1225
    """Computes the list of nodes and their attributes.
1226

1227
    """
1228
    nodenames = self.wanted
1229
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1230

    
1231
    # begin data gathering
1232

    
1233
    if self.dynamic_fields.intersection(self.op.output_fields):
1234
      live_data = {}
1235
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1236
      for name in nodenames:
1237
        nodeinfo = node_data.get(name, None)
1238
        if nodeinfo:
1239
          live_data[name] = {
1240
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1241
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1242
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1243
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1244
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1245
            "bootid": nodeinfo['bootid'],
1246
            }
1247
        else:
1248
          live_data[name] = {}
1249
    else:
1250
      live_data = dict.fromkeys(nodenames, {})
1251

    
1252
    node_to_primary = dict([(name, set()) for name in nodenames])
1253
    node_to_secondary = dict([(name, set()) for name in nodenames])
1254

    
1255
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1256
                             "sinst_cnt", "sinst_list"))
1257
    if inst_fields & frozenset(self.op.output_fields):
1258
      instancelist = self.cfg.GetInstanceList()
1259

    
1260
      for instance_name in instancelist:
1261
        inst = self.cfg.GetInstanceInfo(instance_name)
1262
        if inst.primary_node in node_to_primary:
1263
          node_to_primary[inst.primary_node].add(inst.name)
1264
        for secnode in inst.secondary_nodes:
1265
          if secnode in node_to_secondary:
1266
            node_to_secondary[secnode].add(inst.name)
1267

    
1268
    # end data gathering
1269

    
1270
    output = []
1271
    for node in nodelist:
1272
      node_output = []
1273
      for field in self.op.output_fields:
1274
        if field == "name":
1275
          val = node.name
1276
        elif field == "pinst_list":
1277
          val = list(node_to_primary[node.name])
1278
        elif field == "sinst_list":
1279
          val = list(node_to_secondary[node.name])
1280
        elif field == "pinst_cnt":
1281
          val = len(node_to_primary[node.name])
1282
        elif field == "sinst_cnt":
1283
          val = len(node_to_secondary[node.name])
1284
        elif field == "pip":
1285
          val = node.primary_ip
1286
        elif field == "sip":
1287
          val = node.secondary_ip
1288
        elif field in self.dynamic_fields:
1289
          val = live_data[node.name].get(field, None)
1290
        else:
1291
          raise errors.ParameterError(field)
1292
        node_output.append(val)
1293
      output.append(node_output)
1294

    
1295
    return output
1296

    
1297

    
1298
class LUQueryNodeVolumes(NoHooksLU):
1299
  """Logical unit for getting volumes on node(s).
1300

1301
  """
1302
  _OP_REQP = ["nodes", "output_fields"]
1303

    
1304
  def CheckPrereq(self):
1305
    """Check prerequisites.
1306

1307
    This checks that the fields required are valid output fields.
1308

1309
    """
1310
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1311

    
1312
    _CheckOutputFields(static=["node"],
1313
                       dynamic=["phys", "vg", "name", "size", "instance"],
1314
                       selected=self.op.output_fields)
1315

    
1316

    
1317
  def Exec(self, feedback_fn):
1318
    """Computes the list of nodes and their attributes.
1319

1320
    """
1321
    nodenames = self.nodes
1322
    volumes = rpc.call_node_volumes(nodenames)
1323

    
1324
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1325
             in self.cfg.GetInstanceList()]
1326

    
1327
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1328

    
1329
    output = []
1330
    for node in nodenames:
1331
      if node not in volumes or not volumes[node]:
1332
        continue
1333

    
1334
      node_vols = volumes[node][:]
1335
      node_vols.sort(key=lambda vol: vol['dev'])
1336

    
1337
      for vol in node_vols:
1338
        node_output = []
1339
        for field in self.op.output_fields:
1340
          if field == "node":
1341
            val = node
1342
          elif field == "phys":
1343
            val = vol['dev']
1344
          elif field == "vg":
1345
            val = vol['vg']
1346
          elif field == "name":
1347
            val = vol['name']
1348
          elif field == "size":
1349
            val = int(float(vol['size']))
1350
          elif field == "instance":
1351
            for inst in ilist:
1352
              if node not in lv_by_node[inst]:
1353
                continue
1354
              if vol['name'] in lv_by_node[inst][node]:
1355
                val = inst.name
1356
                break
1357
            else:
1358
              val = '-'
1359
          else:
1360
            raise errors.ParameterError(field)
1361
          node_output.append(str(val))
1362

    
1363
        output.append(node_output)
1364

    
1365
    return output
1366

    
1367

    
1368
class LUAddNode(LogicalUnit):
1369
  """Logical unit for adding node to the cluster.
1370

1371
  """
1372
  HPATH = "node-add"
1373
  HTYPE = constants.HTYPE_NODE
1374
  _OP_REQP = ["node_name"]
1375

    
1376
  def BuildHooksEnv(self):
1377
    """Build hooks env.
1378

1379
    This will run on all nodes before, and on all nodes + the new node after.
1380

1381
    """
1382
    env = {
1383
      "OP_TARGET": self.op.node_name,
1384
      "NODE_NAME": self.op.node_name,
1385
      "NODE_PIP": self.op.primary_ip,
1386
      "NODE_SIP": self.op.secondary_ip,
1387
      }
1388
    nodes_0 = self.cfg.GetNodeList()
1389
    nodes_1 = nodes_0 + [self.op.node_name, ]
1390
    return env, nodes_0, nodes_1
1391

    
1392
  def CheckPrereq(self):
1393
    """Check prerequisites.
1394

1395
    This checks:
1396
     - the new node is not already in the config
1397
     - it is resolvable
1398
     - its parameters (single/dual homed) matches the cluster
1399

1400
    Any errors are signalled by raising errors.OpPrereqError.
1401

1402
    """
1403
    node_name = self.op.node_name
1404
    cfg = self.cfg
1405

    
1406
    dns_data = utils.HostInfo(node_name)
1407

    
1408
    node = dns_data.name
1409
    primary_ip = self.op.primary_ip = dns_data.ip
1410
    secondary_ip = getattr(self.op, "secondary_ip", None)
1411
    if secondary_ip is None:
1412
      secondary_ip = primary_ip
1413
    if not utils.IsValidIP(secondary_ip):
1414
      raise errors.OpPrereqError("Invalid secondary IP given")
1415
    self.op.secondary_ip = secondary_ip
1416
    node_list = cfg.GetNodeList()
1417
    if node in node_list:
1418
      raise errors.OpPrereqError("Node %s is already in the configuration"
1419
                                 % node)
1420

    
1421
    for existing_node_name in node_list:
1422
      existing_node = cfg.GetNodeInfo(existing_node_name)
1423
      if (existing_node.primary_ip == primary_ip or
1424
          existing_node.secondary_ip == primary_ip or
1425
          existing_node.primary_ip == secondary_ip or
1426
          existing_node.secondary_ip == secondary_ip):
1427
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1428
                                   " existing node %s" % existing_node.name)
1429

    
1430
    # check that the type of the node (single versus dual homed) is the
1431
    # same as for the master
1432
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1433
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1434
    newbie_singlehomed = secondary_ip == primary_ip
1435
    if master_singlehomed != newbie_singlehomed:
1436
      if master_singlehomed:
1437
        raise errors.OpPrereqError("The master has no private ip but the"
1438
                                   " new node has one")
1439
      else:
1440
        raise errors.OpPrereqError("The master has a private ip but the"
1441
                                   " new node doesn't have one")
1442

    
1443
    # checks reachablity
1444
    if not utils.TcpPing(utils.HostInfo().name,
1445
                         primary_ip,
1446
                         constants.DEFAULT_NODED_PORT):
1447
      raise errors.OpPrereqError("Node not reachable by ping")
1448

    
1449
    if not newbie_singlehomed:
1450
      # check reachability from my secondary ip to newbie's secondary ip
1451
      if not utils.TcpPing(myself.secondary_ip,
1452
                           secondary_ip,
1453
                           constants.DEFAULT_NODED_PORT):
1454
        raise errors.OpPrereqError(
1455
          "Node secondary ip not reachable by TCP based ping to noded port")
1456

    
1457
    self.new_node = objects.Node(name=node,
1458
                                 primary_ip=primary_ip,
1459
                                 secondary_ip=secondary_ip)
1460

    
1461
  def Exec(self, feedback_fn):
1462
    """Adds the new node to the cluster.
1463

1464
    """
1465
    new_node = self.new_node
1466
    node = new_node.name
1467

    
1468
    # set up inter-node password and certificate and restarts the node daemon
1469
    gntpass = self.sstore.GetNodeDaemonPassword()
1470
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1471
      raise errors.OpExecError("ganeti password corruption detected")
1472
    f = open(constants.SSL_CERT_FILE)
1473
    try:
1474
      gntpem = f.read(8192)
1475
    finally:
1476
      f.close()
1477
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1478
    # so we use this to detect an invalid certificate; as long as the
1479
    # cert doesn't contain this, the here-document will be correctly
1480
    # parsed by the shell sequence below
1481
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1482
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1483
    if not gntpem.endswith("\n"):
1484
      raise errors.OpExecError("PEM must end with newline")
1485
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1486

    
1487
    # and then connect with ssh to set password and start ganeti-noded
1488
    # note that all the below variables are sanitized at this point,
1489
    # either by being constants or by the checks above
1490
    ss = self.sstore
1491
    mycommand = ("umask 077 && "
1492
                 "echo '%s' > '%s' && "
1493
                 "cat > '%s' << '!EOF.' && \n"
1494
                 "%s!EOF.\n%s restart" %
1495
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1496
                  constants.SSL_CERT_FILE, gntpem,
1497
                  constants.NODE_INITD_SCRIPT))
1498

    
1499
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1500
    if result.failed:
1501
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1502
                               " output: %s" %
1503
                               (node, result.fail_reason, result.output))
1504

    
1505
    # check connectivity
1506
    time.sleep(4)
1507

    
1508
    result = rpc.call_version([node])[node]
1509
    if result:
1510
      if constants.PROTOCOL_VERSION == result:
1511
        logger.Info("communication to node %s fine, sw version %s match" %
1512
                    (node, result))
1513
      else:
1514
        raise errors.OpExecError("Version mismatch master version %s,"
1515
                                 " node version %s" %
1516
                                 (constants.PROTOCOL_VERSION, result))
1517
    else:
1518
      raise errors.OpExecError("Cannot get version from the new node")
1519

    
1520
    # setup ssh on node
1521
    logger.Info("copy ssh key to node %s" % node)
1522
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1523
    keyarray = []
1524
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1525
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1526
                priv_key, pub_key]
1527

    
1528
    for i in keyfiles:
1529
      f = open(i, 'r')
1530
      try:
1531
        keyarray.append(f.read())
1532
      finally:
1533
        f.close()
1534

    
1535
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1536
                               keyarray[3], keyarray[4], keyarray[5])
1537

    
1538
    if not result:
1539
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1540

    
1541
    # Add node to our /etc/hosts, and add key to known_hosts
1542
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1543
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1544
                      self.cfg.GetHostKey())
1545

    
1546
    if new_node.secondary_ip != new_node.primary_ip:
1547
      if not rpc.call_node_tcp_ping(new_node.name,
1548
                                    constants.LOCALHOST_IP_ADDRESS,
1549
                                    new_node.secondary_ip,
1550
                                    constants.DEFAULT_NODED_PORT,
1551
                                    10, False):
1552
        raise errors.OpExecError("Node claims it doesn't have the"
1553
                                 " secondary ip you gave (%s).\n"
1554
                                 "Please fix and re-run this command." %
1555
                                 new_node.secondary_ip)
1556

    
1557
    success, msg = ssh.VerifyNodeHostname(node)
1558
    if not success:
1559
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1560
                               " than the one the resolver gives: %s.\n"
1561
                               "Please fix and re-run this command." %
1562
                               (node, msg))
1563

    
1564
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1565
    # including the node just added
1566
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1567
    dist_nodes = self.cfg.GetNodeList() + [node]
1568
    if myself.name in dist_nodes:
1569
      dist_nodes.remove(myself.name)
1570

    
1571
    logger.Debug("Copying hosts and known_hosts to all nodes")
1572
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1573
      result = rpc.call_upload_file(dist_nodes, fname)
1574
      for to_node in dist_nodes:
1575
        if not result[to_node]:
1576
          logger.Error("copy of file %s to node %s failed" %
1577
                       (fname, to_node))
1578

    
1579
    to_copy = ss.GetFileList()
1580
    for fname in to_copy:
1581
      if not ssh.CopyFileToNode(node, fname):
1582
        logger.Error("could not copy file %s to node %s" % (fname, node))
1583

    
1584
    logger.Info("adding node %s to cluster.conf" % node)
1585
    self.cfg.AddNode(new_node)
1586

    
1587

    
1588
class LUMasterFailover(LogicalUnit):
1589
  """Failover the master node to the current node.
1590

1591
  This is a special LU in that it must run on a non-master node.
1592

1593
  """
1594
  HPATH = "master-failover"
1595
  HTYPE = constants.HTYPE_CLUSTER
1596
  REQ_MASTER = False
1597
  _OP_REQP = []
1598

    
1599
  def BuildHooksEnv(self):
1600
    """Build hooks env.
1601

1602
    This will run on the new master only in the pre phase, and on all
1603
    the nodes in the post phase.
1604

1605
    """
1606
    env = {
1607
      "OP_TARGET": self.new_master,
1608
      "NEW_MASTER": self.new_master,
1609
      "OLD_MASTER": self.old_master,
1610
      }
1611
    return env, [self.new_master], self.cfg.GetNodeList()
1612

    
1613
  def CheckPrereq(self):
1614
    """Check prerequisites.
1615

1616
    This checks that we are not already the master.
1617

1618
    """
1619
    self.new_master = utils.HostInfo().name
1620
    self.old_master = self.sstore.GetMasterNode()
1621

    
1622
    if self.old_master == self.new_master:
1623
      raise errors.OpPrereqError("This commands must be run on the node"
1624
                                 " where you want the new master to be.\n"
1625
                                 "%s is already the master" %
1626
                                 self.old_master)
1627

    
1628
  def Exec(self, feedback_fn):
1629
    """Failover the master node.
1630

1631
    This command, when run on a non-master node, will cause the current
1632
    master to cease being master, and the non-master to become new
1633
    master.
1634

1635
    """
1636
    #TODO: do not rely on gethostname returning the FQDN
1637
    logger.Info("setting master to %s, old master: %s" %
1638
                (self.new_master, self.old_master))
1639

    
1640
    if not rpc.call_node_stop_master(self.old_master):
1641
      logger.Error("could disable the master role on the old master"
1642
                   " %s, please disable manually" % self.old_master)
1643

    
1644
    ss = self.sstore
1645
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1646
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1647
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1648
      logger.Error("could not distribute the new simple store master file"
1649
                   " to the other nodes, please check.")
1650

    
1651
    if not rpc.call_node_start_master(self.new_master):
1652
      logger.Error("could not start the master role on the new master"
1653
                   " %s, please check" % self.new_master)
1654
      feedback_fn("Error in activating the master IP on the new master,\n"
1655
                  "please fix manually.")
1656

    
1657

    
1658

    
1659
class LUQueryClusterInfo(NoHooksLU):
1660
  """Query cluster configuration.
1661

1662
  """
1663
  _OP_REQP = []
1664
  REQ_MASTER = False
1665

    
1666
  def CheckPrereq(self):
1667
    """No prerequsites needed for this LU.
1668

1669
    """
1670
    pass
1671

    
1672
  def Exec(self, feedback_fn):
1673
    """Return cluster config.
1674

1675
    """
1676
    result = {
1677
      "name": self.sstore.GetClusterName(),
1678
      "software_version": constants.RELEASE_VERSION,
1679
      "protocol_version": constants.PROTOCOL_VERSION,
1680
      "config_version": constants.CONFIG_VERSION,
1681
      "os_api_version": constants.OS_API_VERSION,
1682
      "export_version": constants.EXPORT_VERSION,
1683
      "master": self.sstore.GetMasterNode(),
1684
      "architecture": (platform.architecture()[0], platform.machine()),
1685
      }
1686

    
1687
    return result
1688

    
1689

    
1690
class LUClusterCopyFile(NoHooksLU):
1691
  """Copy file to cluster.
1692

1693
  """
1694
  _OP_REQP = ["nodes", "filename"]
1695

    
1696
  def CheckPrereq(self):
1697
    """Check prerequisites.
1698

1699
    It should check that the named file exists and that the given list
1700
    of nodes is valid.
1701

1702
    """
1703
    if not os.path.exists(self.op.filename):
1704
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1705

    
1706
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1707

    
1708
  def Exec(self, feedback_fn):
1709
    """Copy a file from master to some nodes.
1710

1711
    Args:
1712
      opts - class with options as members
1713
      args - list containing a single element, the file name
1714
    Opts used:
1715
      nodes - list containing the name of target nodes; if empty, all nodes
1716

1717
    """
1718
    filename = self.op.filename
1719

    
1720
    myname = utils.HostInfo().name
1721

    
1722
    for node in self.nodes:
1723
      if node == myname:
1724
        continue
1725
      if not ssh.CopyFileToNode(node, filename):
1726
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1727

    
1728

    
1729
class LUDumpClusterConfig(NoHooksLU):
1730
  """Return a text-representation of the cluster-config.
1731

1732
  """
1733
  _OP_REQP = []
1734

    
1735
  def CheckPrereq(self):
1736
    """No prerequisites.
1737

1738
    """
1739
    pass
1740

    
1741
  def Exec(self, feedback_fn):
1742
    """Dump a representation of the cluster config to the standard output.
1743

1744
    """
1745
    return self.cfg.DumpConfig()
1746

    
1747

    
1748
class LURunClusterCommand(NoHooksLU):
1749
  """Run a command on some nodes.
1750

1751
  """
1752
  _OP_REQP = ["command", "nodes"]
1753

    
1754
  def CheckPrereq(self):
1755
    """Check prerequisites.
1756

1757
    It checks that the given list of nodes is valid.
1758

1759
    """
1760
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1761

    
1762
  def Exec(self, feedback_fn):
1763
    """Run a command on some nodes.
1764

1765
    """
1766
    data = []
1767
    for node in self.nodes:
1768
      result = ssh.SSHCall(node, "root", self.op.command)
1769
      data.append((node, result.output, result.exit_code))
1770

    
1771
    return data
1772

    
1773

    
1774
class LUActivateInstanceDisks(NoHooksLU):
1775
  """Bring up an instance's disks.
1776

1777
  """
1778
  _OP_REQP = ["instance_name"]
1779

    
1780
  def CheckPrereq(self):
1781
    """Check prerequisites.
1782

1783
    This checks that the instance is in the cluster.
1784

1785
    """
1786
    instance = self.cfg.GetInstanceInfo(
1787
      self.cfg.ExpandInstanceName(self.op.instance_name))
1788
    if instance is None:
1789
      raise errors.OpPrereqError("Instance '%s' not known" %
1790
                                 self.op.instance_name)
1791
    self.instance = instance
1792

    
1793

    
1794
  def Exec(self, feedback_fn):
1795
    """Activate the disks.
1796

1797
    """
1798
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1799
    if not disks_ok:
1800
      raise errors.OpExecError("Cannot activate block devices")
1801

    
1802
    return disks_info
1803

    
1804

    
1805
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1806
  """Prepare the block devices for an instance.
1807

1808
  This sets up the block devices on all nodes.
1809

1810
  Args:
1811
    instance: a ganeti.objects.Instance object
1812
    ignore_secondaries: if true, errors on secondary nodes won't result
1813
                        in an error return from the function
1814

1815
  Returns:
1816
    false if the operation failed
1817
    list of (host, instance_visible_name, node_visible_name) if the operation
1818
         suceeded with the mapping from node devices to instance devices
1819
  """
1820
  device_info = []
1821
  disks_ok = True
1822
  for inst_disk in instance.disks:
1823
    master_result = None
1824
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1825
      cfg.SetDiskID(node_disk, node)
1826
      is_primary = node == instance.primary_node
1827
      result = rpc.call_blockdev_assemble(node, node_disk,
1828
                                          instance.name, is_primary)
1829
      if not result:
1830
        logger.Error("could not prepare block device %s on node %s (is_pri"
1831
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1832
        if is_primary or not ignore_secondaries:
1833
          disks_ok = False
1834
      if is_primary:
1835
        master_result = result
1836
    device_info.append((instance.primary_node, inst_disk.iv_name,
1837
                        master_result))
1838

    
1839
  # leave the disks configured for the primary node
1840
  # this is a workaround that would be fixed better by
1841
  # improving the logical/physical id handling
1842
  for disk in instance.disks:
1843
    cfg.SetDiskID(disk, instance.primary_node)
1844

    
1845
  return disks_ok, device_info
1846

    
1847

    
1848
def _StartInstanceDisks(cfg, instance, force):
1849
  """Start the disks of an instance.
1850

1851
  """
1852
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1853
                                           ignore_secondaries=force)
1854
  if not disks_ok:
1855
    _ShutdownInstanceDisks(instance, cfg)
1856
    if force is not None and not force:
1857
      logger.Error("If the message above refers to a secondary node,"
1858
                   " you can retry the operation using '--force'.")
1859
    raise errors.OpExecError("Disk consistency error")
1860

    
1861

    
1862
class LUDeactivateInstanceDisks(NoHooksLU):
1863
  """Shutdown an instance's disks.
1864

1865
  """
1866
  _OP_REQP = ["instance_name"]
1867

    
1868
  def CheckPrereq(self):
1869
    """Check prerequisites.
1870

1871
    This checks that the instance is in the cluster.
1872

1873
    """
1874
    instance = self.cfg.GetInstanceInfo(
1875
      self.cfg.ExpandInstanceName(self.op.instance_name))
1876
    if instance is None:
1877
      raise errors.OpPrereqError("Instance '%s' not known" %
1878
                                 self.op.instance_name)
1879
    self.instance = instance
1880

    
1881
  def Exec(self, feedback_fn):
1882
    """Deactivate the disks
1883

1884
    """
1885
    instance = self.instance
1886
    ins_l = rpc.call_instance_list([instance.primary_node])
1887
    ins_l = ins_l[instance.primary_node]
1888
    if not type(ins_l) is list:
1889
      raise errors.OpExecError("Can't contact node '%s'" %
1890
                               instance.primary_node)
1891

    
1892
    if self.instance.name in ins_l:
1893
      raise errors.OpExecError("Instance is running, can't shutdown"
1894
                               " block devices.")
1895

    
1896
    _ShutdownInstanceDisks(instance, self.cfg)
1897

    
1898

    
1899
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1900
  """Shutdown block devices of an instance.
1901

1902
  This does the shutdown on all nodes of the instance.
1903

1904
  If the ignore_primary is false, errors on the primary node are
1905
  ignored.
1906

1907
  """
1908
  result = True
1909
  for disk in instance.disks:
1910
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1911
      cfg.SetDiskID(top_disk, node)
1912
      if not rpc.call_blockdev_shutdown(node, top_disk):
1913
        logger.Error("could not shutdown block device %s on node %s" %
1914
                     (disk.iv_name, node))
1915
        if not ignore_primary or node != instance.primary_node:
1916
          result = False
1917
  return result
1918

    
1919

    
1920
class LUStartupInstance(LogicalUnit):
1921
  """Starts an instance.
1922

1923
  """
1924
  HPATH = "instance-start"
1925
  HTYPE = constants.HTYPE_INSTANCE
1926
  _OP_REQP = ["instance_name", "force"]
1927

    
1928
  def BuildHooksEnv(self):
1929
    """Build hooks env.
1930

1931
    This runs on master, primary and secondary nodes of the instance.
1932

1933
    """
1934
    env = {
1935
      "FORCE": self.op.force,
1936
      }
1937
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1938
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1939
          list(self.instance.secondary_nodes))
1940
    return env, nl, nl
1941

    
1942
  def CheckPrereq(self):
1943
    """Check prerequisites.
1944

1945
    This checks that the instance is in the cluster.
1946

1947
    """
1948
    instance = self.cfg.GetInstanceInfo(
1949
      self.cfg.ExpandInstanceName(self.op.instance_name))
1950
    if instance is None:
1951
      raise errors.OpPrereqError("Instance '%s' not known" %
1952
                                 self.op.instance_name)
1953

    
1954
    # check bridges existance
1955
    _CheckInstanceBridgesExist(instance)
1956

    
1957
    self.instance = instance
1958
    self.op.instance_name = instance.name
1959

    
1960
  def Exec(self, feedback_fn):
1961
    """Start the instance.
1962

1963
    """
1964
    instance = self.instance
1965
    force = self.op.force
1966
    extra_args = getattr(self.op, "extra_args", "")
1967

    
1968
    node_current = instance.primary_node
1969

    
1970
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1971
    if not nodeinfo:
1972
      raise errors.OpExecError("Could not contact node %s for infos" %
1973
                               (node_current))
1974

    
1975
    freememory = nodeinfo[node_current]['memory_free']
1976
    memory = instance.memory
1977
    if memory > freememory:
1978
      raise errors.OpExecError("Not enough memory to start instance"
1979
                               " %s on node %s"
1980
                               " needed %s MiB, available %s MiB" %
1981
                               (instance.name, node_current, memory,
1982
                                freememory))
1983

    
1984
    _StartInstanceDisks(self.cfg, instance, force)
1985

    
1986
    if not rpc.call_instance_start(node_current, instance, extra_args):
1987
      _ShutdownInstanceDisks(instance, self.cfg)
1988
      raise errors.OpExecError("Could not start instance")
1989

    
1990
    self.cfg.MarkInstanceUp(instance.name)
1991

    
1992

    
1993
class LURebootInstance(LogicalUnit):
1994
  """Reboot an instance.
1995

1996
  """
1997
  HPATH = "instance-reboot"
1998
  HTYPE = constants.HTYPE_INSTANCE
1999
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2000

    
2001
  def BuildHooksEnv(self):
2002
    """Build hooks env.
2003

2004
    This runs on master, primary and secondary nodes of the instance.
2005

2006
    """
2007
    env = {
2008
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2009
      }
2010
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2011
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2012
          list(self.instance.secondary_nodes))
2013
    return env, nl, nl
2014

    
2015
  def CheckPrereq(self):
2016
    """Check prerequisites.
2017

2018
    This checks that the instance is in the cluster.
2019

2020
    """
2021
    instance = self.cfg.GetInstanceInfo(
2022
      self.cfg.ExpandInstanceName(self.op.instance_name))
2023
    if instance is None:
2024
      raise errors.OpPrereqError("Instance '%s' not known" %
2025
                                 self.op.instance_name)
2026

    
2027
    # check bridges existance
2028
    _CheckInstanceBridgesExist(instance)
2029

    
2030
    self.instance = instance
2031
    self.op.instance_name = instance.name
2032

    
2033
  def Exec(self, feedback_fn):
2034
    """Reboot the instance.
2035

2036
    """
2037
    instance = self.instance
2038
    ignore_secondaries = self.op.ignore_secondaries
2039
    reboot_type = self.op.reboot_type
2040
    extra_args = getattr(self.op, "extra_args", "")
2041

    
2042
    node_current = instance.primary_node
2043

    
2044
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2045
                           constants.INSTANCE_REBOOT_HARD,
2046
                           constants.INSTANCE_REBOOT_FULL]:
2047
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2048
                                  (constants.INSTANCE_REBOOT_SOFT,
2049
                                   constants.INSTANCE_REBOOT_HARD,
2050
                                   constants.INSTANCE_REBOOT_FULL))
2051

    
2052
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2053
                       constants.INSTANCE_REBOOT_HARD]:
2054
      if not rpc.call_instance_reboot(node_current, instance,
2055
                                      reboot_type, extra_args):
2056
        raise errors.OpExecError("Could not reboot instance")
2057
    else:
2058
      if not rpc.call_instance_shutdown(node_current, instance):
2059
        raise errors.OpExecError("could not shutdown instance for full reboot")
2060
      _ShutdownInstanceDisks(instance, self.cfg)
2061
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2062
      if not rpc.call_instance_start(node_current, instance, extra_args):
2063
        _ShutdownInstanceDisks(instance, self.cfg)
2064
        raise errors.OpExecError("Could not start instance for full reboot")
2065

    
2066
    self.cfg.MarkInstanceUp(instance.name)
2067

    
2068

    
2069
class LUShutdownInstance(LogicalUnit):
2070
  """Shutdown an instance.
2071

2072
  """
2073
  HPATH = "instance-stop"
2074
  HTYPE = constants.HTYPE_INSTANCE
2075
  _OP_REQP = ["instance_name"]
2076

    
2077
  def BuildHooksEnv(self):
2078
    """Build hooks env.
2079

2080
    This runs on master, primary and secondary nodes of the instance.
2081

2082
    """
2083
    env = _BuildInstanceHookEnvByObject(self.instance)
2084
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2085
          list(self.instance.secondary_nodes))
2086
    return env, nl, nl
2087

    
2088
  def CheckPrereq(self):
2089
    """Check prerequisites.
2090

2091
    This checks that the instance is in the cluster.
2092

2093
    """
2094
    instance = self.cfg.GetInstanceInfo(
2095
      self.cfg.ExpandInstanceName(self.op.instance_name))
2096
    if instance is None:
2097
      raise errors.OpPrereqError("Instance '%s' not known" %
2098
                                 self.op.instance_name)
2099
    self.instance = instance
2100

    
2101
  def Exec(self, feedback_fn):
2102
    """Shutdown the instance.
2103

2104
    """
2105
    instance = self.instance
2106
    node_current = instance.primary_node
2107
    if not rpc.call_instance_shutdown(node_current, instance):
2108
      logger.Error("could not shutdown instance")
2109

    
2110
    self.cfg.MarkInstanceDown(instance.name)
2111
    _ShutdownInstanceDisks(instance, self.cfg)
2112

    
2113

    
2114
class LUReinstallInstance(LogicalUnit):
2115
  """Reinstall an instance.
2116

2117
  """
2118
  HPATH = "instance-reinstall"
2119
  HTYPE = constants.HTYPE_INSTANCE
2120
  _OP_REQP = ["instance_name"]
2121

    
2122
  def BuildHooksEnv(self):
2123
    """Build hooks env.
2124

2125
    This runs on master, primary and secondary nodes of the instance.
2126

2127
    """
2128
    env = _BuildInstanceHookEnvByObject(self.instance)
2129
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2130
          list(self.instance.secondary_nodes))
2131
    return env, nl, nl
2132

    
2133
  def CheckPrereq(self):
2134
    """Check prerequisites.
2135

2136
    This checks that the instance is in the cluster and is not running.
2137

2138
    """
2139
    instance = self.cfg.GetInstanceInfo(
2140
      self.cfg.ExpandInstanceName(self.op.instance_name))
2141
    if instance is None:
2142
      raise errors.OpPrereqError("Instance '%s' not known" %
2143
                                 self.op.instance_name)
2144
    if instance.disk_template == constants.DT_DISKLESS:
2145
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2146
                                 self.op.instance_name)
2147
    if instance.status != "down":
2148
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2149
                                 self.op.instance_name)
2150
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2151
    if remote_info:
2152
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2153
                                 (self.op.instance_name,
2154
                                  instance.primary_node))
2155

    
2156
    self.op.os_type = getattr(self.op, "os_type", None)
2157
    if self.op.os_type is not None:
2158
      # OS verification
2159
      pnode = self.cfg.GetNodeInfo(
2160
        self.cfg.ExpandNodeName(instance.primary_node))
2161
      if pnode is None:
2162
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2163
                                   self.op.pnode)
2164
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2165
      if not os_obj:
2166
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2167
                                   " primary node"  % self.op.os_type)
2168

    
2169
    self.instance = instance
2170

    
2171
  def Exec(self, feedback_fn):
2172
    """Reinstall the instance.
2173

2174
    """
2175
    inst = self.instance
2176

    
2177
    if self.op.os_type is not None:
2178
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2179
      inst.os = self.op.os_type
2180
      self.cfg.AddInstance(inst)
2181

    
2182
    _StartInstanceDisks(self.cfg, inst, None)
2183
    try:
2184
      feedback_fn("Running the instance OS create scripts...")
2185
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2186
        raise errors.OpExecError("Could not install OS for instance %s "
2187
                                 "on node %s" %
2188
                                 (inst.name, inst.primary_node))
2189
    finally:
2190
      _ShutdownInstanceDisks(inst, self.cfg)
2191

    
2192

    
2193
class LURenameInstance(LogicalUnit):
2194
  """Rename an instance.
2195

2196
  """
2197
  HPATH = "instance-rename"
2198
  HTYPE = constants.HTYPE_INSTANCE
2199
  _OP_REQP = ["instance_name", "new_name"]
2200

    
2201
  def BuildHooksEnv(self):
2202
    """Build hooks env.
2203

2204
    This runs on master, primary and secondary nodes of the instance.
2205

2206
    """
2207
    env = _BuildInstanceHookEnvByObject(self.instance)
2208
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2209
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2210
          list(self.instance.secondary_nodes))
2211
    return env, nl, nl
2212

    
2213
  def CheckPrereq(self):
2214
    """Check prerequisites.
2215

2216
    This checks that the instance is in the cluster and is not running.
2217

2218
    """
2219
    instance = self.cfg.GetInstanceInfo(
2220
      self.cfg.ExpandInstanceName(self.op.instance_name))
2221
    if instance is None:
2222
      raise errors.OpPrereqError("Instance '%s' not known" %
2223
                                 self.op.instance_name)
2224
    if instance.status != "down":
2225
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2226
                                 self.op.instance_name)
2227
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2228
    if remote_info:
2229
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2230
                                 (self.op.instance_name,
2231
                                  instance.primary_node))
2232
    self.instance = instance
2233

    
2234
    # new name verification
2235
    name_info = utils.HostInfo(self.op.new_name)
2236

    
2237
    self.op.new_name = new_name = name_info.name
2238
    if not getattr(self.op, "ignore_ip", False):
2239
      command = ["fping", "-q", name_info.ip]
2240
      result = utils.RunCmd(command)
2241
      if not result.failed:
2242
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2243
                                   (name_info.ip, new_name))
2244

    
2245

    
2246
  def Exec(self, feedback_fn):
2247
    """Reinstall the instance.
2248

2249
    """
2250
    inst = self.instance
2251
    old_name = inst.name
2252

    
2253
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2254

    
2255
    # re-read the instance from the configuration after rename
2256
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2257

    
2258
    _StartInstanceDisks(self.cfg, inst, None)
2259
    try:
2260
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2261
                                          "sda", "sdb"):
2262
        msg = ("Could run OS rename script for instance %s\n"
2263
               "on node %s\n"
2264
               "(but the instance has been renamed in Ganeti)" %
2265
               (inst.name, inst.primary_node))
2266
        logger.Error(msg)
2267
    finally:
2268
      _ShutdownInstanceDisks(inst, self.cfg)
2269

    
2270

    
2271
class LURemoveInstance(LogicalUnit):
2272
  """Remove an instance.
2273

2274
  """
2275
  HPATH = "instance-remove"
2276
  HTYPE = constants.HTYPE_INSTANCE
2277
  _OP_REQP = ["instance_name"]
2278

    
2279
  def BuildHooksEnv(self):
2280
    """Build hooks env.
2281

2282
    This runs on master, primary and secondary nodes of the instance.
2283

2284
    """
2285
    env = _BuildInstanceHookEnvByObject(self.instance)
2286
    nl = [self.sstore.GetMasterNode()]
2287
    return env, nl, nl
2288

    
2289
  def CheckPrereq(self):
2290
    """Check prerequisites.
2291

2292
    This checks that the instance is in the cluster.
2293

2294
    """
2295
    instance = self.cfg.GetInstanceInfo(
2296
      self.cfg.ExpandInstanceName(self.op.instance_name))
2297
    if instance is None:
2298
      raise errors.OpPrereqError("Instance '%s' not known" %
2299
                                 self.op.instance_name)
2300
    self.instance = instance
2301

    
2302
  def Exec(self, feedback_fn):
2303
    """Remove the instance.
2304

2305
    """
2306
    instance = self.instance
2307
    logger.Info("shutting down instance %s on node %s" %
2308
                (instance.name, instance.primary_node))
2309

    
2310
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2311
      if self.op.ignore_failures:
2312
        feedback_fn("Warning: can't shutdown instance")
2313
      else:
2314
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2315
                                 (instance.name, instance.primary_node))
2316

    
2317
    logger.Info("removing block devices for instance %s" % instance.name)
2318

    
2319
    if not _RemoveDisks(instance, self.cfg):
2320
      if self.op.ignore_failures:
2321
        feedback_fn("Warning: can't remove instance's disks")
2322
      else:
2323
        raise errors.OpExecError("Can't remove instance's disks")
2324

    
2325
    logger.Info("removing instance %s out of cluster config" % instance.name)
2326

    
2327
    self.cfg.RemoveInstance(instance.name)
2328

    
2329

    
2330
class LUQueryInstances(NoHooksLU):
2331
  """Logical unit for querying instances.
2332

2333
  """
2334
  _OP_REQP = ["output_fields", "names"]
2335

    
2336
  def CheckPrereq(self):
2337
    """Check prerequisites.
2338

2339
    This checks that the fields required are valid output fields.
2340

2341
    """
2342
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2343
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2344
                               "admin_state", "admin_ram",
2345
                               "disk_template", "ip", "mac", "bridge",
2346
                               "sda_size", "sdb_size"],
2347
                       dynamic=self.dynamic_fields,
2348
                       selected=self.op.output_fields)
2349

    
2350
    self.wanted = _GetWantedInstances(self, self.op.names)
2351

    
2352
  def Exec(self, feedback_fn):
2353
    """Computes the list of nodes and their attributes.
2354

2355
    """
2356
    instance_names = self.wanted
2357
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2358
                     in instance_names]
2359

    
2360
    # begin data gathering
2361

    
2362
    nodes = frozenset([inst.primary_node for inst in instance_list])
2363

    
2364
    bad_nodes = []
2365
    if self.dynamic_fields.intersection(self.op.output_fields):
2366
      live_data = {}
2367
      node_data = rpc.call_all_instances_info(nodes)
2368
      for name in nodes:
2369
        result = node_data[name]
2370
        if result:
2371
          live_data.update(result)
2372
        elif result == False:
2373
          bad_nodes.append(name)
2374
        # else no instance is alive
2375
    else:
2376
      live_data = dict([(name, {}) for name in instance_names])
2377

    
2378
    # end data gathering
2379

    
2380
    output = []
2381
    for instance in instance_list:
2382
      iout = []
2383
      for field in self.op.output_fields:
2384
        if field == "name":
2385
          val = instance.name
2386
        elif field == "os":
2387
          val = instance.os
2388
        elif field == "pnode":
2389
          val = instance.primary_node
2390
        elif field == "snodes":
2391
          val = list(instance.secondary_nodes)
2392
        elif field == "admin_state":
2393
          val = (instance.status != "down")
2394
        elif field == "oper_state":
2395
          if instance.primary_node in bad_nodes:
2396
            val = None
2397
          else:
2398
            val = bool(live_data.get(instance.name))
2399
        elif field == "admin_ram":
2400
          val = instance.memory
2401
        elif field == "oper_ram":
2402
          if instance.primary_node in bad_nodes:
2403
            val = None
2404
          elif instance.name in live_data:
2405
            val = live_data[instance.name].get("memory", "?")
2406
          else:
2407
            val = "-"
2408
        elif field == "disk_template":
2409
          val = instance.disk_template
2410
        elif field == "ip":
2411
          val = instance.nics[0].ip
2412
        elif field == "bridge":
2413
          val = instance.nics[0].bridge
2414
        elif field == "mac":
2415
          val = instance.nics[0].mac
2416
        elif field == "sda_size" or field == "sdb_size":
2417
          disk = instance.FindDisk(field[:3])
2418
          if disk is None:
2419
            val = None
2420
          else:
2421
            val = disk.size
2422
        else:
2423
          raise errors.ParameterError(field)
2424
        iout.append(val)
2425
      output.append(iout)
2426

    
2427
    return output
2428

    
2429

    
2430
class LUFailoverInstance(LogicalUnit):
2431
  """Failover an instance.
2432

2433
  """
2434
  HPATH = "instance-failover"
2435
  HTYPE = constants.HTYPE_INSTANCE
2436
  _OP_REQP = ["instance_name", "ignore_consistency"]
2437

    
2438
  def BuildHooksEnv(self):
2439
    """Build hooks env.
2440

2441
    This runs on master, primary and secondary nodes of the instance.
2442

2443
    """
2444
    env = {
2445
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2446
      }
2447
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2448
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2449
    return env, nl, nl
2450

    
2451
  def CheckPrereq(self):
2452
    """Check prerequisites.
2453

2454
    This checks that the instance is in the cluster.
2455

2456
    """
2457
    instance = self.cfg.GetInstanceInfo(
2458
      self.cfg.ExpandInstanceName(self.op.instance_name))
2459
    if instance is None:
2460
      raise errors.OpPrereqError("Instance '%s' not known" %
2461
                                 self.op.instance_name)
2462

    
2463
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2464
      raise errors.OpPrereqError("Instance's disk layout is not"
2465
                                 " network mirrored, cannot failover.")
2466

    
2467
    secondary_nodes = instance.secondary_nodes
2468
    if not secondary_nodes:
2469
      raise errors.ProgrammerError("no secondary node but using "
2470
                                   "DT_REMOTE_RAID1 template")
2471

    
2472
    # check memory requirements on the secondary node
2473
    target_node = secondary_nodes[0]
2474
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2475
    info = nodeinfo.get(target_node, None)
2476
    if not info:
2477
      raise errors.OpPrereqError("Cannot get current information"
2478
                                 " from node '%s'" % nodeinfo)
2479
    if instance.memory > info['memory_free']:
2480
      raise errors.OpPrereqError("Not enough memory on target node %s."
2481
                                 " %d MB available, %d MB required" %
2482
                                 (target_node, info['memory_free'],
2483
                                  instance.memory))
2484

    
2485
    # check bridge existance
2486
    brlist = [nic.bridge for nic in instance.nics]
2487
    if not rpc.call_bridges_exist(target_node, brlist):
2488
      raise errors.OpPrereqError("One or more target bridges %s does not"
2489
                                 " exist on destination node '%s'" %
2490
                                 (brlist, target_node))
2491

    
2492
    self.instance = instance
2493

    
2494
  def Exec(self, feedback_fn):
2495
    """Failover an instance.
2496

2497
    The failover is done by shutting it down on its present node and
2498
    starting it on the secondary.
2499

2500
    """
2501
    instance = self.instance
2502

    
2503
    source_node = instance.primary_node
2504
    target_node = instance.secondary_nodes[0]
2505

    
2506
    feedback_fn("* checking disk consistency between source and target")
2507
    for dev in instance.disks:
2508
      # for remote_raid1, these are md over drbd
2509
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2510
        if not self.op.ignore_consistency:
2511
          raise errors.OpExecError("Disk %s is degraded on target node,"
2512
                                   " aborting failover." % dev.iv_name)
2513

    
2514
    feedback_fn("* checking target node resource availability")
2515
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2516

    
2517
    if not nodeinfo:
2518
      raise errors.OpExecError("Could not contact target node %s." %
2519
                               target_node)
2520

    
2521
    free_memory = int(nodeinfo[target_node]['memory_free'])
2522
    memory = instance.memory
2523
    if memory > free_memory:
2524
      raise errors.OpExecError("Not enough memory to create instance %s on"
2525
                               " node %s. needed %s MiB, available %s MiB" %
2526
                               (instance.name, target_node, memory,
2527
                                free_memory))
2528

    
2529
    feedback_fn("* shutting down instance on source node")
2530
    logger.Info("Shutting down instance %s on node %s" %
2531
                (instance.name, source_node))
2532

    
2533
    if not rpc.call_instance_shutdown(source_node, instance):
2534
      if self.op.ignore_consistency:
2535
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2536
                     " anyway. Please make sure node %s is down"  %
2537
                     (instance.name, source_node, source_node))
2538
      else:
2539
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2540
                                 (instance.name, source_node))
2541

    
2542
    feedback_fn("* deactivating the instance's disks on source node")
2543
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2544
      raise errors.OpExecError("Can't shut down the instance's disks.")
2545

    
2546
    instance.primary_node = target_node
2547
    # distribute new instance config to the other nodes
2548
    self.cfg.AddInstance(instance)
2549

    
2550
    feedback_fn("* activating the instance's disks on target node")
2551
    logger.Info("Starting instance %s on node %s" %
2552
                (instance.name, target_node))
2553

    
2554
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2555
                                             ignore_secondaries=True)
2556
    if not disks_ok:
2557
      _ShutdownInstanceDisks(instance, self.cfg)
2558
      raise errors.OpExecError("Can't activate the instance's disks")
2559

    
2560
    feedback_fn("* starting the instance on the target node")
2561
    if not rpc.call_instance_start(target_node, instance, None):
2562
      _ShutdownInstanceDisks(instance, self.cfg)
2563
      raise errors.OpExecError("Could not start instance %s on node %s." %
2564
                               (instance.name, target_node))
2565

    
2566

    
2567
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2568
  """Create a tree of block devices on the primary node.
2569

2570
  This always creates all devices.
2571

2572
  """
2573
  if device.children:
2574
    for child in device.children:
2575
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2576
        return False
2577

    
2578
  cfg.SetDiskID(device, node)
2579
  new_id = rpc.call_blockdev_create(node, device, device.size,
2580
                                    instance.name, True, info)
2581
  if not new_id:
2582
    return False
2583
  if device.physical_id is None:
2584
    device.physical_id = new_id
2585
  return True
2586

    
2587

    
2588
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2589
  """Create a tree of block devices on a secondary node.
2590

2591
  If this device type has to be created on secondaries, create it and
2592
  all its children.
2593

2594
  If not, just recurse to children keeping the same 'force' value.
2595

2596
  """
2597
  if device.CreateOnSecondary():
2598
    force = True
2599
  if device.children:
2600
    for child in device.children:
2601
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2602
                                        child, force, info):
2603
        return False
2604

    
2605
  if not force:
2606
    return True
2607
  cfg.SetDiskID(device, node)
2608
  new_id = rpc.call_blockdev_create(node, device, device.size,
2609
                                    instance.name, False, info)
2610
  if not new_id:
2611
    return False
2612
  if device.physical_id is None:
2613
    device.physical_id = new_id
2614
  return True
2615

    
2616

    
2617
def _GenerateUniqueNames(cfg, exts):
2618
  """Generate a suitable LV name.
2619

2620
  This will generate a logical volume name for the given instance.
2621

2622
  """
2623
  results = []
2624
  for val in exts:
2625
    new_id = cfg.GenerateUniqueID()
2626
    results.append("%s%s" % (new_id, val))
2627
  return results
2628

    
2629

    
2630
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2631
  """Generate a drbd device complete with its children.
2632

2633
  """
2634
  port = cfg.AllocatePort()
2635
  vgname = cfg.GetVGName()
2636
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2637
                          logical_id=(vgname, names[0]))
2638
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2639
                          logical_id=(vgname, names[1]))
2640
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2641
                          logical_id = (primary, secondary, port),
2642
                          children = [dev_data, dev_meta])
2643
  return drbd_dev
2644

    
2645

    
2646
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2647
  """Generate a drbd8 device complete with its children.
2648

2649
  """
2650
  port = cfg.AllocatePort()
2651
  vgname = cfg.GetVGName()
2652
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2653
                          logical_id=(vgname, names[0]))
2654
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2655
                          logical_id=(vgname, names[1]))
2656
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2657
                          logical_id = (primary, secondary, port),
2658
                          children = [dev_data, dev_meta],
2659
                          iv_name=iv_name)
2660
  return drbd_dev
2661

    
2662
def _GenerateDiskTemplate(cfg, template_name,
2663
                          instance_name, primary_node,
2664
                          secondary_nodes, disk_sz, swap_sz):
2665
  """Generate the entire disk layout for a given template type.
2666

2667
  """
2668
  #TODO: compute space requirements
2669

    
2670
  vgname = cfg.GetVGName()
2671
  if template_name == "diskless":
2672
    disks = []
2673
  elif template_name == "plain":
2674
    if len(secondary_nodes) != 0:
2675
      raise errors.ProgrammerError("Wrong template configuration")
2676

    
2677
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2678
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2679
                           logical_id=(vgname, names[0]),
2680
                           iv_name = "sda")
2681
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2682
                           logical_id=(vgname, names[1]),
2683
                           iv_name = "sdb")
2684
    disks = [sda_dev, sdb_dev]
2685
  elif template_name == "local_raid1":
2686
    if len(secondary_nodes) != 0:
2687
      raise errors.ProgrammerError("Wrong template configuration")
2688

    
2689

    
2690
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2691
                                       ".sdb_m1", ".sdb_m2"])
2692
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2693
                              logical_id=(vgname, names[0]))
2694
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2695
                              logical_id=(vgname, names[1]))
2696
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2697
                              size=disk_sz,
2698
                              children = [sda_dev_m1, sda_dev_m2])
2699
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2700
                              logical_id=(vgname, names[2]))
2701
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2702
                              logical_id=(vgname, names[3]))
2703
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2704
                              size=swap_sz,
2705
                              children = [sdb_dev_m1, sdb_dev_m2])
2706
    disks = [md_sda_dev, md_sdb_dev]
2707
  elif template_name == constants.DT_REMOTE_RAID1:
2708
    if len(secondary_nodes) != 1:
2709
      raise errors.ProgrammerError("Wrong template configuration")
2710
    remote_node = secondary_nodes[0]
2711
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2712
                                       ".sdb_data", ".sdb_meta"])
2713
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2714
                                         disk_sz, names[0:2])
2715
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2716
                              children = [drbd_sda_dev], size=disk_sz)
2717
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2718
                                         swap_sz, names[2:4])
2719
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2720
                              children = [drbd_sdb_dev], size=swap_sz)
2721
    disks = [md_sda_dev, md_sdb_dev]
2722
  elif template_name == constants.DT_DRBD8:
2723
    if len(secondary_nodes) != 1:
2724
      raise errors.ProgrammerError("Wrong template configuration")
2725
    remote_node = secondary_nodes[0]
2726
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2727
                                       ".sdb_data", ".sdb_meta"])
2728
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2729
                                         disk_sz, names[0:2], "sda")
2730
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2731
                                         swap_sz, names[2:4], "sdb")
2732
    disks = [drbd_sda_dev, drbd_sdb_dev]
2733
  else:
2734
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2735
  return disks
2736

    
2737

    
2738
def _GetInstanceInfoText(instance):
2739
  """Compute that text that should be added to the disk's metadata.
2740

2741
  """
2742
  return "originstname+%s" % instance.name
2743

    
2744

    
2745
def _CreateDisks(cfg, instance):
2746
  """Create all disks for an instance.
2747

2748
  This abstracts away some work from AddInstance.
2749

2750
  Args:
2751
    instance: the instance object
2752

2753
  Returns:
2754
    True or False showing the success of the creation process
2755

2756
  """
2757
  info = _GetInstanceInfoText(instance)
2758

    
2759
  for device in instance.disks:
2760
    logger.Info("creating volume %s for instance %s" %
2761
              (device.iv_name, instance.name))
2762
    #HARDCODE
2763
    for secondary_node in instance.secondary_nodes:
2764
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2765
                                        device, False, info):
2766
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2767
                     (device.iv_name, device, secondary_node))
2768
        return False
2769
    #HARDCODE
2770
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2771
                                    instance, device, info):
2772
      logger.Error("failed to create volume %s on primary!" %
2773
                   device.iv_name)
2774
      return False
2775
  return True
2776

    
2777

    
2778
def _RemoveDisks(instance, cfg):
2779
  """Remove all disks for an instance.
2780

2781
  This abstracts away some work from `AddInstance()` and
2782
  `RemoveInstance()`. Note that in case some of the devices couldn't
2783
  be removed, the removal will continue with the other ones (compare
2784
  with `_CreateDisks()`).
2785

2786
  Args:
2787
    instance: the instance object
2788

2789
  Returns:
2790
    True or False showing the success of the removal proces
2791

2792
  """
2793
  logger.Info("removing block devices for instance %s" % instance.name)
2794

    
2795
  result = True
2796
  for device in instance.disks:
2797
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2798
      cfg.SetDiskID(disk, node)
2799
      if not rpc.call_blockdev_remove(node, disk):
2800
        logger.Error("could not remove block device %s on node %s,"
2801
                     " continuing anyway" %
2802
                     (device.iv_name, node))
2803
        result = False
2804
  return result
2805

    
2806

    
2807
class LUCreateInstance(LogicalUnit):
2808
  """Create an instance.
2809

2810
  """
2811
  HPATH = "instance-add"
2812
  HTYPE = constants.HTYPE_INSTANCE
2813
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2814
              "disk_template", "swap_size", "mode", "start", "vcpus",
2815
              "wait_for_sync", "ip_check"]
2816

    
2817
  def BuildHooksEnv(self):
2818
    """Build hooks env.
2819

2820
    This runs on master, primary and secondary nodes of the instance.
2821

2822
    """
2823
    env = {
2824
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2825
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2826
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2827
      "INSTANCE_ADD_MODE": self.op.mode,
2828
      }
2829
    if self.op.mode == constants.INSTANCE_IMPORT:
2830
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2831
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2832
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2833

    
2834
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2835
      primary_node=self.op.pnode,
2836
      secondary_nodes=self.secondaries,
2837
      status=self.instance_status,
2838
      os_type=self.op.os_type,
2839
      memory=self.op.mem_size,
2840
      vcpus=self.op.vcpus,
2841
      nics=[(self.inst_ip, self.op.bridge)],
2842
    ))
2843

    
2844
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2845
          self.secondaries)
2846
    return env, nl, nl
2847

    
2848

    
2849
  def CheckPrereq(self):
2850
    """Check prerequisites.
2851

2852
    """
2853
    if self.op.mode not in (constants.INSTANCE_CREATE,
2854
                            constants.INSTANCE_IMPORT):
2855
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2856
                                 self.op.mode)
2857

    
2858
    if self.op.mode == constants.INSTANCE_IMPORT:
2859
      src_node = getattr(self.op, "src_node", None)
2860
      src_path = getattr(self.op, "src_path", None)
2861
      if src_node is None or src_path is None:
2862
        raise errors.OpPrereqError("Importing an instance requires source"
2863
                                   " node and path options")
2864
      src_node_full = self.cfg.ExpandNodeName(src_node)
2865
      if src_node_full is None:
2866
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2867
      self.op.src_node = src_node = src_node_full
2868

    
2869
      if not os.path.isabs(src_path):
2870
        raise errors.OpPrereqError("The source path must be absolute")
2871

    
2872
      export_info = rpc.call_export_info(src_node, src_path)
2873

    
2874
      if not export_info:
2875
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2876

    
2877
      if not export_info.has_section(constants.INISECT_EXP):
2878
        raise errors.ProgrammerError("Corrupted export config")
2879

    
2880
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2881
      if (int(ei_version) != constants.EXPORT_VERSION):
2882
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2883
                                   (ei_version, constants.EXPORT_VERSION))
2884

    
2885
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2886
        raise errors.OpPrereqError("Can't import instance with more than"
2887
                                   " one data disk")
2888

    
2889
      # FIXME: are the old os-es, disk sizes, etc. useful?
2890
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2891
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2892
                                                         'disk0_dump'))
2893
      self.src_image = diskimage
2894
    else: # INSTANCE_CREATE
2895
      if getattr(self.op, "os_type", None) is None:
2896
        raise errors.OpPrereqError("No guest OS specified")
2897

    
2898
    # check primary node
2899
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2900
    if pnode is None:
2901
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2902
                                 self.op.pnode)
2903
    self.op.pnode = pnode.name
2904
    self.pnode = pnode
2905
    self.secondaries = []
2906
    # disk template and mirror node verification
2907
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2908
      raise errors.OpPrereqError("Invalid disk template name")
2909

    
2910
    if self.op.disk_template in constants.DTS_NET_MIRROR:
2911
      if getattr(self.op, "snode", None) is None:
2912
        raise errors.OpPrereqError("The networked disk templates need"
2913
                                   " a mirror node")
2914

    
2915
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2916
      if snode_name is None:
2917
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2918
                                   self.op.snode)
2919
      elif snode_name == pnode.name:
2920
        raise errors.OpPrereqError("The secondary node cannot be"
2921
                                   " the primary node.")
2922
      self.secondaries.append(snode_name)
2923

    
2924
    # Check lv size requirements
2925
    nodenames = [pnode.name] + self.secondaries
2926
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2927

    
2928
    # Required free disk space as a function of disk and swap space
2929
    req_size_dict = {
2930
      constants.DT_DISKLESS: 0,
2931
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2932
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2933
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2934
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2935
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2936
    }
2937

    
2938
    if self.op.disk_template not in req_size_dict:
2939
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2940
                                   " is unknown" %  self.op.disk_template)
2941

    
2942
    req_size = req_size_dict[self.op.disk_template]
2943

    
2944
    for node in nodenames:
2945
      info = nodeinfo.get(node, None)
2946
      if not info:
2947
        raise errors.OpPrereqError("Cannot get current information"
2948
                                   " from node '%s'" % nodeinfo)
2949
      if req_size > info['vg_free']:
2950
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2951
                                   " %d MB available, %d MB required" %
2952
                                   (node, info['vg_free'], req_size))
2953

    
2954
    # os verification
2955
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2956
    if not os_obj:
2957
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2958
                                 " primary node"  % self.op.os_type)
2959

    
2960
    # instance verification
2961
    hostname1 = utils.HostInfo(self.op.instance_name)
2962

    
2963
    self.op.instance_name = instance_name = hostname1.name
2964
    instance_list = self.cfg.GetInstanceList()
2965
    if instance_name in instance_list:
2966
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2967
                                 instance_name)
2968

    
2969
    ip = getattr(self.op, "ip", None)
2970
    if ip is None or ip.lower() == "none":
2971
      inst_ip = None
2972
    elif ip.lower() == "auto":
2973
      inst_ip = hostname1.ip
2974
    else:
2975
      if not utils.IsValidIP(ip):
2976
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
2977
                                   " like a valid IP" % ip)
2978
      inst_ip = ip
2979
    self.inst_ip = inst_ip
2980

    
2981
    if self.op.start and not self.op.ip_check:
2982
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2983
                                 " adding an instance in start mode")
2984

    
2985
    if self.op.ip_check:
2986
      if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2987
                       constants.DEFAULT_NODED_PORT):
2988
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2989
                                   (hostname1.ip, instance_name))
2990

    
2991
    # bridge verification
2992
    bridge = getattr(self.op, "bridge", None)
2993
    if bridge is None:
2994
      self.op.bridge = self.cfg.GetDefBridge()
2995
    else:
2996
      self.op.bridge = bridge
2997

    
2998
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2999
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3000
                                 " destination node '%s'" %
3001
                                 (self.op.bridge, pnode.name))
3002

    
3003
    if self.op.start:
3004
      self.instance_status = 'up'
3005
    else:
3006
      self.instance_status = 'down'
3007

    
3008
  def Exec(self, feedback_fn):
3009
    """Create and add the instance to the cluster.
3010

3011
    """
3012
    instance = self.op.instance_name
3013
    pnode_name = self.pnode.name
3014

    
3015
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
3016
    if self.inst_ip is not None:
3017
      nic.ip = self.inst_ip
3018

    
3019
    disks = _GenerateDiskTemplate(self.cfg,
3020
                                  self.op.disk_template,
3021
                                  instance, pnode_name,
3022
                                  self.secondaries, self.op.disk_size,
3023
                                  self.op.swap_size)
3024

    
3025
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3026
                            primary_node=pnode_name,
3027
                            memory=self.op.mem_size,
3028
                            vcpus=self.op.vcpus,
3029
                            nics=[nic], disks=disks,
3030
                            disk_template=self.op.disk_template,
3031
                            status=self.instance_status,
3032
                            )
3033

    
3034
    feedback_fn("* creating instance disks...")
3035
    if not _CreateDisks(self.cfg, iobj):
3036
      _RemoveDisks(iobj, self.cfg)
3037
      raise errors.OpExecError("Device creation failed, reverting...")
3038

    
3039
    feedback_fn("adding instance %s to cluster config" % instance)
3040

    
3041
    self.cfg.AddInstance(iobj)
3042

    
3043
    if self.op.wait_for_sync:
3044
      disk_abort = not _WaitForSync(self.cfg, iobj)
3045
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3046
      # make sure the disks are not degraded (still sync-ing is ok)
3047
      time.sleep(15)
3048
      feedback_fn("* checking mirrors status")
3049
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
3050
    else:
3051
      disk_abort = False
3052

    
3053
    if disk_abort:
3054
      _RemoveDisks(iobj, self.cfg)
3055
      self.cfg.RemoveInstance(iobj.name)
3056
      raise errors.OpExecError("There are some degraded disks for"
3057
                               " this instance")
3058

    
3059
    feedback_fn("creating os for instance %s on node %s" %
3060
                (instance, pnode_name))
3061

    
3062
    if iobj.disk_template != constants.DT_DISKLESS:
3063
      if self.op.mode == constants.INSTANCE_CREATE:
3064
        feedback_fn("* running the instance OS create scripts...")
3065
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3066
          raise errors.OpExecError("could not add os for instance %s"
3067
                                   " on node %s" %
3068
                                   (instance, pnode_name))
3069

    
3070
      elif self.op.mode == constants.INSTANCE_IMPORT:
3071
        feedback_fn("* running the instance OS import scripts...")
3072
        src_node = self.op.src_node
3073
        src_image = self.src_image
3074
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3075
                                                src_node, src_image):
3076
          raise errors.OpExecError("Could not import os for instance"
3077
                                   " %s on node %s" %
3078
                                   (instance, pnode_name))
3079
      else:
3080
        # also checked in the prereq part
3081
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3082
                                     % self.op.mode)
3083

    
3084
    if self.op.start:
3085
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3086
      feedback_fn("* starting instance...")
3087
      if not rpc.call_instance_start(pnode_name, iobj, None):
3088
        raise errors.OpExecError("Could not start instance")
3089

    
3090

    
3091
class LUConnectConsole(NoHooksLU):
3092
  """Connect to an instance's console.
3093

3094
  This is somewhat special in that it returns the command line that
3095
  you need to run on the master node in order to connect to the
3096
  console.
3097

3098
  """
3099
  _OP_REQP = ["instance_name"]
3100

    
3101
  def CheckPrereq(self):
3102
    """Check prerequisites.
3103

3104
    This checks that the instance is in the cluster.
3105

3106
    """
3107
    instance = self.cfg.GetInstanceInfo(
3108
      self.cfg.ExpandInstanceName(self.op.instance_name))
3109
    if instance is None:
3110
      raise errors.OpPrereqError("Instance '%s' not known" %
3111
                                 self.op.instance_name)
3112
    self.instance = instance
3113

    
3114
  def Exec(self, feedback_fn):
3115
    """Connect to the console of an instance
3116

3117
    """
3118
    instance = self.instance
3119
    node = instance.primary_node
3120

    
3121
    node_insts = rpc.call_instance_list([node])[node]
3122
    if node_insts is False:
3123
      raise errors.OpExecError("Can't connect to node %s." % node)
3124

    
3125
    if instance.name not in node_insts:
3126
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3127

    
3128
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3129

    
3130
    hyper = hypervisor.GetHypervisor()
3131
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
3132
    # build ssh cmdline
3133
    argv = ["ssh", "-q", "-t"]
3134
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
3135
    argv.extend(ssh.BATCH_MODE_OPTS)
3136
    argv.append(node)
3137
    argv.append(console_cmd)
3138
    return "ssh", argv
3139

    
3140

    
3141
class LUAddMDDRBDComponent(LogicalUnit):
3142
  """Adda new mirror member to an instance's disk.
3143

3144
  """
3145
  HPATH = "mirror-add"
3146
  HTYPE = constants.HTYPE_INSTANCE
3147
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3148

    
3149
  def BuildHooksEnv(self):
3150
    """Build hooks env.
3151

3152
    This runs on the master, the primary and all the secondaries.
3153

3154
    """
3155
    env = {
3156
      "NEW_SECONDARY": self.op.remote_node,
3157
      "DISK_NAME": self.op.disk_name,
3158
      }
3159
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3160
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3161
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3162
    return env, nl, nl
3163

    
3164
  def CheckPrereq(self):
3165
    """Check prerequisites.
3166

3167
    This checks that the instance is in the cluster.
3168

3169
    """
3170
    instance = self.cfg.GetInstanceInfo(
3171
      self.cfg.ExpandInstanceName(self.op.instance_name))
3172
    if instance is None:
3173
      raise errors.OpPrereqError("Instance '%s' not known" %
3174
                                 self.op.instance_name)
3175
    self.instance = instance
3176

    
3177
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3178
    if remote_node is None:
3179
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3180
    self.remote_node = remote_node
3181

    
3182
    if remote_node == instance.primary_node:
3183
      raise errors.OpPrereqError("The specified node is the primary node of"
3184
                                 " the instance.")
3185

    
3186
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3187
      raise errors.OpPrereqError("Instance's disk layout is not"
3188
                                 " remote_raid1.")
3189
    for disk in instance.disks:
3190
      if disk.iv_name == self.op.disk_name:
3191
        break
3192
    else:
3193
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3194
                                 " instance." % self.op.disk_name)
3195
    if len(disk.children) > 1:
3196
      raise errors.OpPrereqError("The device already has two slave"
3197
                                 " devices.\n"
3198
                                 "This would create a 3-disk raid1"
3199
                                 " which we don't allow.")
3200
    self.disk = disk
3201

    
3202
  def Exec(self, feedback_fn):
3203
    """Add the mirror component
3204

3205
    """
3206
    disk = self.disk
3207
    instance = self.instance
3208

    
3209
    remote_node = self.remote_node
3210
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3211
    names = _GenerateUniqueNames(self.cfg, lv_names)
3212
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3213
                                     remote_node, disk.size, names)
3214

    
3215
    logger.Info("adding new mirror component on secondary")
3216
    #HARDCODE
3217
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3218
                                      new_drbd, False,
3219
                                      _GetInstanceInfoText(instance)):
3220
      raise errors.OpExecError("Failed to create new component on secondary"
3221
                               " node %s" % remote_node)
3222

    
3223
    logger.Info("adding new mirror component on primary")
3224
    #HARDCODE
3225
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3226
                                    instance, new_drbd,
3227
                                    _GetInstanceInfoText(instance)):
3228
      # remove secondary dev
3229
      self.cfg.SetDiskID(new_drbd, remote_node)
3230
      rpc.call_blockdev_remove(remote_node, new_drbd)
3231
      raise errors.OpExecError("Failed to create volume on primary")
3232

    
3233
    # the device exists now
3234
    # call the primary node to add the mirror to md
3235
    logger.Info("adding new mirror component to md")
3236
    if not rpc.call_blockdev_addchildren(instance.primary_node,
3237
                                         disk, [new_drbd]):
3238
      logger.Error("Can't add mirror compoment to md!")
3239
      self.cfg.SetDiskID(new_drbd, remote_node)
3240
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3241
        logger.Error("Can't rollback on secondary")
3242
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3243
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3244
        logger.Error("Can't rollback on primary")
3245
      raise errors.OpExecError("Can't add mirror component to md array")
3246

    
3247
    disk.children.append(new_drbd)
3248

    
3249
    self.cfg.AddInstance(instance)
3250

    
3251
    _WaitForSync(self.cfg, instance)
3252

    
3253
    return 0
3254

    
3255

    
3256
class LURemoveMDDRBDComponent(LogicalUnit):
3257
  """Remove a component from a remote_raid1 disk.
3258

3259
  """
3260
  HPATH = "mirror-remove"
3261
  HTYPE = constants.HTYPE_INSTANCE
3262
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3263

    
3264
  def BuildHooksEnv(self):
3265
    """Build hooks env.
3266

3267
    This runs on the master, the primary and all the secondaries.
3268

3269
    """
3270
    env = {
3271
      "DISK_NAME": self.op.disk_name,
3272
      "DISK_ID": self.op.disk_id,
3273
      "OLD_SECONDARY": self.old_secondary,
3274
      }
3275
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3276
    nl = [self.sstore.GetMasterNode(),
3277
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3278
    return env, nl, nl
3279

    
3280
  def CheckPrereq(self):
3281
    """Check prerequisites.
3282

3283
    This checks that the instance is in the cluster.
3284

3285
    """
3286
    instance = self.cfg.GetInstanceInfo(
3287
      self.cfg.ExpandInstanceName(self.op.instance_name))
3288
    if instance is None:
3289
      raise errors.OpPrereqError("Instance '%s' not known" %
3290
                                 self.op.instance_name)
3291
    self.instance = instance
3292

    
3293
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3294
      raise errors.OpPrereqError("Instance's disk layout is not"
3295
                                 " remote_raid1.")
3296
    for disk in instance.disks:
3297
      if disk.iv_name == self.op.disk_name:
3298
        break
3299
    else:
3300
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3301
                                 " instance." % self.op.disk_name)
3302
    for child in disk.children:
3303
      if (child.dev_type == constants.LD_DRBD7 and
3304
          child.logical_id[2] == self.op.disk_id):
3305
        break
3306
    else:
3307
      raise errors.OpPrereqError("Can't find the device with this port.")
3308

    
3309
    if len(disk.children) < 2:
3310
      raise errors.OpPrereqError("Cannot remove the last component from"
3311
                                 " a mirror.")
3312
    self.disk = disk
3313
    self.child = child
3314
    if self.child.logical_id[0] == instance.primary_node:
3315
      oid = 1
3316
    else:
3317
      oid = 0
3318
    self.old_secondary = self.child.logical_id[oid]
3319

    
3320
  def Exec(self, feedback_fn):
3321
    """Remove the mirror component
3322

3323
    """
3324
    instance = self.instance
3325
    disk = self.disk
3326
    child = self.child
3327
    logger.Info("remove mirror component")
3328
    self.cfg.SetDiskID(disk, instance.primary_node)
3329
    if not rpc.call_blockdev_removechildren(instance.primary_node,
3330
                                            disk, [child]):
3331
      raise errors.OpExecError("Can't remove child from mirror.")
3332

    
3333
    for node in child.logical_id[:2]:
3334
      self.cfg.SetDiskID(child, node)
3335
      if not rpc.call_blockdev_remove(node, child):
3336
        logger.Error("Warning: failed to remove device from node %s,"
3337
                     " continuing operation." % node)
3338

    
3339
    disk.children.remove(child)
3340
    self.cfg.AddInstance(instance)
3341

    
3342

    
3343
class LUReplaceDisks(LogicalUnit):
3344
  """Replace the disks of an instance.
3345

3346
  """
3347
  HPATH = "mirrors-replace"
3348
  HTYPE = constants.HTYPE_INSTANCE
3349
  _OP_REQP = ["instance_name", "mode", "disks"]
3350

    
3351
  def BuildHooksEnv(self):
3352
    """Build hooks env.
3353

3354
    This runs on the master, the primary and all the secondaries.
3355

3356
    """
3357
    env = {
3358
      "MODE": self.op.mode,
3359
      "NEW_SECONDARY": self.op.remote_node,
3360
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3361
      }
3362
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3363
    nl = [self.sstore.GetMasterNode(),
3364
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3365
    return env, nl, nl
3366

    
3367
  def CheckPrereq(self):
3368
    """Check prerequisites.
3369

3370
    This checks that the instance is in the cluster.
3371

3372
    """
3373
    instance = self.cfg.GetInstanceInfo(
3374
      self.cfg.ExpandInstanceName(self.op.instance_name))
3375
    if instance is None:
3376
      raise errors.OpPrereqError("Instance '%s' not known" %
3377
                                 self.op.instance_name)
3378
    self.instance = instance
3379

    
3380
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3381
      raise errors.OpPrereqError("Instance's disk layout is not"
3382
                                 " network mirrored.")
3383

    
3384
    if len(instance.secondary_nodes) != 1:
3385
      raise errors.OpPrereqError("The instance has a strange layout,"
3386
                                 " expected one secondary but found %d" %
3387
                                 len(instance.secondary_nodes))
3388

    
3389
    self.sec_node = instance.secondary_nodes[0]
3390

    
3391
    remote_node = getattr(self.op, "remote_node", None)
3392
    if remote_node is not None:
3393
      remote_node = self.cfg.ExpandNodeName(remote_node)
3394
      if remote_node is None:
3395
        raise errors.OpPrereqError("Node '%s' not known" %
3396
                                   self.op.remote_node)
3397
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3398
    else:
3399
      self.remote_node_info = None
3400
    if remote_node == instance.primary_node:
3401
      raise errors.OpPrereqError("The specified node is the primary node of"
3402
                                 " the instance.")
3403
    elif remote_node == self.sec_node:
3404
      # the user gave the current secondary, switch to
3405
      # 'no-replace-secondary' mode
3406
      remote_node = None
3407
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3408
        self.op.mode != constants.REPLACE_DISK_ALL):
3409
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3410
                                 " disks replacement, not individual ones")
3411
    if instance.disk_template == constants.DT_DRBD8:
3412
      if self.op.mode == constants.REPLACE_DISK_ALL:
3413
        raise errors.OpPrereqError("Template 'drbd8' only allows primary or"
3414
                                   " secondary disk replacement, not"
3415
                                   " both at once")
3416
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3417
        if remote_node is not None:
3418
          raise errors.OpPrereqError("Template 'drbd8' does not allow changing"
3419
                                     " the secondary while doing a primary"
3420
                                     " node disk replacement")
3421
        self.tgt_node = instance.primary_node
3422
        self.oth_node = instance.secondary_nodes[0]
3423
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3424
        self.new_node = remote_node # this can be None, in which case
3425
                                    # we don't change the secondary
3426
        self.tgt_node = instance.secondary_nodes[0]
3427
        self.oth_node = instance.primary_node
3428
      else:
3429
        raise errors.ProgrammerError("Unhandled disk replace mode")
3430

    
3431
    for name in self.op.disks:
3432
      if instance.FindDisk(name) is None:
3433
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3434
                                   (name, instance.name))
3435
    self.op.remote_node = remote_node
3436

    
3437
  def _ExecRR1(self, feedback_fn):
3438
    """Replace the disks of an instance.
3439

3440
    """
3441
    instance = self.instance
3442
    iv_names = {}
3443
    # start of work
3444
    if self.op.remote_node is None:
3445
      remote_node = self.sec_node
3446
    else:
3447
      remote_node = self.op.remote_node
3448
    cfg = self.cfg
3449
    for dev in instance.disks:
3450
      size = dev.size
3451
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3452
      names = _GenerateUniqueNames(cfg, lv_names)
3453
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3454
                                       remote_node, size, names)
3455
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3456
      logger.Info("adding new mirror component on secondary for %s" %
3457
                  dev.iv_name)
3458
      #HARDCODE
3459
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3460
                                        new_drbd, False,
3461
                                        _GetInstanceInfoText(instance)):
3462
        raise errors.OpExecError("Failed to create new component on"
3463
                                 " secondary node %s\n"
3464
                                 "Full abort, cleanup manually!" %
3465
                                 remote_node)
3466

    
3467
      logger.Info("adding new mirror component on primary")
3468
      #HARDCODE
3469
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3470
                                      instance, new_drbd,
3471
                                      _GetInstanceInfoText(instance)):
3472
        # remove secondary dev
3473
        cfg.SetDiskID(new_drbd, remote_node)
3474
        rpc.call_blockdev_remove(remote_node, new_drbd)
3475
        raise errors.OpExecError("Failed to create volume on primary!\n"
3476
                                 "Full abort, cleanup manually!!")
3477

    
3478
      # the device exists now
3479
      # call the primary node to add the mirror to md
3480
      logger.Info("adding new mirror component to md")
3481
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3482
                                           [new_drbd]):
3483
        logger.Error("Can't add mirror compoment to md!")
3484
        cfg.SetDiskID(new_drbd, remote_node)
3485
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3486
          logger.Error("Can't rollback on secondary")
3487
        cfg.SetDiskID(new_drbd, instance.primary_node)
3488
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3489
          logger.Error("Can't rollback on primary")
3490
        raise errors.OpExecError("Full abort, cleanup manually!!")
3491

    
3492
      dev.children.append(new_drbd)
3493
      cfg.AddInstance(instance)
3494

    
3495
    # this can fail as the old devices are degraded and _WaitForSync
3496
    # does a combined result over all disks, so we don't check its
3497
    # return value
3498
    _WaitForSync(cfg, instance, unlock=True)
3499

    
3500
    # so check manually all the devices
3501
    for name in iv_names:
3502
      dev, child, new_drbd = iv_names[name]
3503
      cfg.SetDiskID(dev, instance.primary_node)
3504
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3505
      if is_degr:
3506
        raise errors.OpExecError("MD device %s is degraded!" % name)
3507
      cfg.SetDiskID(new_drbd, instance.primary_node)
3508
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3509
      if is_degr:
3510
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3511

    
3512
    for name in iv_names:
3513
      dev, child, new_drbd = iv_names[name]
3514
      logger.Info("remove mirror %s component" % name)
3515
      cfg.SetDiskID(dev, instance.primary_node)
3516
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3517
                                              dev, [child]):
3518
        logger.Error("Can't remove child from mirror, aborting"
3519
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3520
        continue
3521

    
3522
      for node in child.logical_id[:2]:
3523
        logger.Info("remove child device on %s" % node)
3524
        cfg.SetDiskID(child, node)
3525
        if not rpc.call_blockdev_remove(node, child):
3526
          logger.Error("Warning: failed to remove device from node %s,"
3527
                       " continuing operation." % node)
3528

    
3529
      dev.children.remove(child)
3530

    
3531
      cfg.AddInstance(instance)
3532

    
3533
  def _ExecD8DiskOnly(self, feedback_fn):
3534
    """Replace a disk on the primary or secondary for dbrd8.
3535

3536
    The algorithm for replace is quite complicated:
3537
      - for each disk to be replaced:
3538
        - create new LVs on the target node with unique names
3539
        - detach old LVs from the drbd device
3540
        - rename old LVs to name_replaced.<time_t>
3541
        - rename new LVs to old LVs
3542
        - attach the new LVs (with the old names now) to the drbd device
3543
      - wait for sync across all devices
3544
      - for each modified disk:
3545
        - remove old LVs (which have the name name_replaces.<time_t>)
3546

3547
    Failures are not very well handled.
3548

3549
    """
3550
    steps_total = 6
3551
    warning, info = (self.processor.LogWarning, self.processor.LogInfo)
3552
    instance = self.instance
3553
    iv_names = {}
3554
    vgname = self.cfg.GetVGName()
3555
    # start of work
3556
    cfg = self.cfg
3557
    tgt_node = self.tgt_node
3558
    oth_node = self.oth_node
3559

    
3560
    # Step: check device activation
3561
    self.processor.LogStep(1, steps_total, "check device existence")
3562
    info("checking volume groups")
3563
    my_vg = cfg.GetVGName()
3564
    results = rpc.call_vg_list([oth_node, tgt_node])
3565
    if not results:
3566
      raise errors.OpExecError("Can't list volume groups on the nodes")
3567
    for node in oth_node, tgt_node:
3568
      res = results.get(node, False)
3569
      if not res or my_vg not in res:
3570
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3571
                                 (my_vg, node))
3572
    for dev in instance.disks:
3573
      if not dev.iv_name in self.op.disks:
3574
        continue
3575
      for node in tgt_node, oth_node:
3576
        info("checking %s on %s" % (dev.iv_name, node))
3577
        cfg.SetDiskID(dev, node)
3578
        if not rpc.call_blockdev_find(node, dev):
3579
          raise errors.OpExecError("Can't find device %s on node %s" %
3580
                                   (dev.iv_name, node))
3581

    
3582
    # Step: check other node consistency
3583
    self.processor.LogStep(2, steps_total, "check peer consistency")
3584
    for dev in instance.disks:
3585
      if not dev.iv_name in self.op.disks:
3586
        continue
3587
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3588
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3589
                                   oth_node==instance.primary_node):
3590
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3591
                                 " to replace disks on this node (%s)" %
3592
                                 (oth_node, tgt_node))
3593

    
3594
    # Step: create new storage
3595
    self.processor.LogStep(3, steps_total, "allocate new storage")
3596
    for dev in instance.disks:
3597
      if not dev.iv_name in self.op.disks:
3598
        continue
3599
      size = dev.size
3600
      cfg.SetDiskID(dev, tgt_node)
3601
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3602
      names = _GenerateUniqueNames(cfg, lv_names)
3603
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3604
                             logical_id=(vgname, names[0]))
3605
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3606
                             logical_id=(vgname, names[1]))
3607
      new_lvs = [lv_data, lv_meta]
3608
      old_lvs = dev.children
3609
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3610
      info("creating new local storage on %s for %s" %
3611
           (tgt_node, dev.iv_name))
3612
      # since we *always* want to create this LV, we use the
3613
      # _Create...OnPrimary (which forces the creation), even if we
3614
      # are talking about the secondary node
3615
      for new_lv in new_lvs:
3616
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3617
                                        _GetInstanceInfoText(instance)):
3618
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3619
                                   " node '%s'" %
3620
                                   (new_lv.logical_id[1], tgt_node))
3621

    
3622
    # Step: for each lv, detach+rename*2+attach
3623
    self.processor.LogStep(4, steps_total, "change drbd configuration")
3624
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3625
      info("detaching %s drbd from local storage" % dev.iv_name)
3626
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3627
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3628
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3629
      #dev.children = []
3630
      #cfg.Update(instance)
3631

    
3632
      # ok, we created the new LVs, so now we know we have the needed
3633
      # storage; as such, we proceed on the target node to rename
3634
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3635
      # using the assumption than logical_id == physical_id (which in
3636
      # turn is the unique_id on that node)
3637

    
3638
      # FIXME(iustin): use a better name for the replaced LVs
3639
      temp_suffix = int(time.time())
3640
      ren_fn = lambda d, suff: (d.physical_id[0],
3641
                                d.physical_id[1] + "_replaced-%s" % suff)
3642
      # build the rename list based on what LVs exist on the node
3643
      rlist = []
3644
      for to_ren in old_lvs:
3645
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3646
        if find_res is not None: # device exists
3647
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3648

    
3649
      info("renaming the old LVs on the target node")
3650
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3651
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3652
      # now we rename the new LVs to the old LVs
3653
      info("renaming the new LVs on the target node")
3654
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3655
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3656
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3657

    
3658
      for old, new in zip(old_lvs, new_lvs):
3659
        new.logical_id = old.logical_id
3660
        cfg.SetDiskID(new, tgt_node)
3661

    
3662
      for disk in old_lvs:
3663
        disk.logical_id = ren_fn(disk, temp_suffix)
3664
        cfg.SetDiskID(disk, tgt_node)
3665

    
3666
      # now that the new lvs have the old name, we can add them to the device
3667
      info("adding new mirror component on %s" % tgt_node)
3668
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3669
        for new_lv in new_lvs:
3670
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3671
            warning("Can't rollback device %s", "manually cleanup unused"
3672
                    " logical volumes")
3673
        raise errors.OpExecError("Can't add local storage to drbd")
3674

    
3675
      dev.children = new_lvs
3676
      cfg.Update(instance)
3677

    
3678
    # Step: wait for sync
3679

    
3680
    # this can fail as the old devices are degraded and _WaitForSync
3681
    # does a combined result over all disks, so we don't check its
3682
    # return value
3683
    self.processor.LogStep(5, steps_total, "sync devices")
3684
    _WaitForSync(cfg, instance, unlock=True)
3685

    
3686
    # so check manually all the devices
3687
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3688
      cfg.SetDiskID(dev, instance.primary_node)
3689
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3690
      if is_degr:
3691
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3692

    
3693
    # Step: remove old storage
3694
    self.processor.LogStep(6, steps_total, "removing old storage")
3695
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3696
      info("remove logical volumes for %s" % name)
3697
      for lv in old_lvs:
3698
        cfg.SetDiskID(lv, tgt_node)
3699
        if not rpc.call_blockdev_remove(tgt_node, lv):
3700
          warning("Can't remove old LV", "manually remove unused LVs")
3701
          continue
3702

    
3703
  def _ExecD8Secondary(self, feedback_fn):
3704
    """Replace the secondary node for drbd8.
3705

3706
    The algorithm for replace is quite complicated:
3707
      - for all disks of the instance:
3708
        - create new LVs on the new node with same names
3709
        - shutdown the drbd device on the old secondary
3710
        - disconnect the drbd network on the primary
3711
        - create the drbd device on the new secondary
3712
        - network attach the drbd on the primary, using an artifice:
3713
          the drbd code for Attach() will connect to the network if it
3714
          finds a device which is connected to the good local disks but
3715
          not network enabled
3716
      - wait for sync across all devices
3717
      - remove all disks from the old secondary
3718

3719
    Failures are not very well handled.
3720
    """
3721
    instance = self.instance
3722
    iv_names = {}
3723
    vgname = self.cfg.GetVGName()
3724
    # start of work
3725
    cfg = self.cfg
3726
    old_node = self.tgt_node
3727
    new_node = self.new_node
3728
    pri_node = instance.primary_node
3729
    for dev in instance.disks:
3730
      size = dev.size
3731
      logger.Info("adding new local storage on %s for %s" %
3732
                  (new_node, dev.iv_name))
3733
      # since we *always* want to create this LV, we use the
3734
      # _Create...OnPrimary (which forces the creation), even if we
3735
      # are talking about the secondary node
3736
      for new_lv in dev.children:
3737
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3738
                                        _GetInstanceInfoText(instance)):
3739
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3740
                                   " node '%s'" %
3741
                                   (new_lv.logical_id[1], new_node))
3742

    
3743
      # create new devices on new_node
3744
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3745
                              logical_id=(pri_node, new_node,
3746
                                          dev.logical_id[2]),
3747
                              children=dev.children)
3748
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3749
                                        new_drbd, False,
3750
                                      _GetInstanceInfoText(instance)):
3751
        raise errors.OpExecError("Failed to create new DRBD on"
3752
                                 " node '%s'" % new_node)
3753

    
3754
      # we have new devices, shutdown the drbd on the old secondary
3755
      cfg.SetDiskID(dev, old_node)
3756
      if not rpc.call_blockdev_shutdown(old_node, dev):
3757
        raise errors.OpExecError("Failed to shutdown DRBD on old node")
3758

    
3759
      # we have new storage, we 'rename' the network on the primary
3760
      cfg.SetDiskID(dev, pri_node)
3761
      # rename to the ip of the new node
3762
      new_uid = list(dev.physical_id)
3763
      new_uid[2] = self.remote_node_info.secondary_ip
3764
      rlist = [(dev, tuple(new_uid))]
3765
      if not rpc.call_blockdev_rename(pri_node, rlist):
3766
        raise errors.OpExecError("Can't detach re-attach drbd %s on node"
3767
                                 " %s from %s to %s" %
3768
                                 (dev.iv_name, pri_node, old_node, new_node))
3769
      dev.logical_id = (pri_node, new_node, dev.logical_id[2])
3770
      cfg.SetDiskID(dev, pri_node)
3771
      cfg.Update(instance)
3772

    
3773
      iv_names[dev.iv_name] = (dev, dev.children)
3774

    
3775
    # this can fail as the old devices are degraded and _WaitForSync
3776
    # does a combined result over all disks, so we don't check its
3777
    # return value
3778
    logger.Info("Done changing drbd configs, waiting for sync")
3779
    _WaitForSync(cfg, instance, unlock=True)
3780

    
3781
    # so check manually all the devices
3782
    for name, (dev, old_lvs) in iv_names.iteritems():
3783
      cfg.SetDiskID(dev, pri_node)
3784
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3785
      if is_degr:
3786
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3787

    
3788
    for name, (dev, old_lvs) in iv_names.iteritems():
3789
      logger.Info("remove logical volumes for %s" % name)
3790
      for lv in old_lvs:
3791
        cfg.SetDiskID(lv, old_node)
3792
        if not rpc.call_blockdev_remove(old_node, lv):
3793
          logger.Error("Can't cleanup child device, skipping. You need to"
3794
                       " fix manually!")
3795
          continue
3796

    
3797
  def Exec(self, feedback_fn):
3798
    """Execute disk replacement.
3799

3800
    This dispatches the disk replacement to the appropriate handler.
3801

3802
    """
3803
    instance = self.instance
3804
    if instance.disk_template == constants.DT_REMOTE_RAID1:
3805
      fn = self._ExecRR1
3806
    elif instance.disk_template == constants.DT_DRBD8:
3807
      if self.op.remote_node is None:
3808
        fn = self._ExecD8DiskOnly
3809
      else:
3810
        fn = self._ExecD8Secondary
3811
    else:
3812
      raise errors.ProgrammerError("Unhandled disk replacement case")
3813
    return fn(feedback_fn)
3814

    
3815

    
3816
class LUQueryInstanceData(NoHooksLU):
3817
  """Query runtime instance data.
3818

3819
  """
3820
  _OP_REQP = ["instances"]
3821

    
3822
  def CheckPrereq(self):
3823
    """Check prerequisites.
3824

3825
    This only checks the optional instance list against the existing names.
3826

3827
    """
3828
    if not isinstance(self.op.instances, list):
3829
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3830
    if self.op.instances:
3831
      self.wanted_instances = []
3832
      names = self.op.instances
3833
      for name in names:
3834
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3835
        if instance is None:
3836
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3837
      self.wanted_instances.append(instance)
3838
    else:
3839
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3840
                               in self.cfg.GetInstanceList()]
3841
    return
3842

    
3843

    
3844
  def _ComputeDiskStatus(self, instance, snode, dev):
3845
    """Compute block device status.
3846

3847
    """
3848
    self.cfg.SetDiskID(dev, instance.primary_node)
3849
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3850
    if dev.dev_type in constants.LDS_DRBD:
3851
      # we change the snode then (otherwise we use the one passed in)
3852
      if dev.logical_id[0] == instance.primary_node:
3853
        snode = dev.logical_id[1]
3854
      else:
3855
        snode = dev.logical_id[0]
3856

    
3857
    if snode:
3858
      self.cfg.SetDiskID(dev, snode)
3859
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3860
    else:
3861
      dev_sstatus = None
3862

    
3863
    if dev.children:
3864
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3865
                      for child in dev.children]
3866
    else:
3867
      dev_children = []
3868

    
3869
    data = {
3870
      "iv_name": dev.iv_name,
3871
      "dev_type": dev.dev_type,
3872
      "logical_id": dev.logical_id,
3873
      "physical_id": dev.physical_id,
3874
      "pstatus": dev_pstatus,
3875
      "sstatus": dev_sstatus,
3876
      "children": dev_children,
3877
      }
3878

    
3879
    return data
3880

    
3881
  def Exec(self, feedback_fn):
3882
    """Gather and return data"""
3883
    result = {}
3884
    for instance in self.wanted_instances:
3885
      remote_info = rpc.call_instance_info(instance.primary_node,
3886
                                                instance.name)
3887
      if remote_info and "state" in remote_info:
3888
        remote_state = "up"
3889
      else:
3890
        remote_state = "down"
3891
      if instance.status == "down":
3892
        config_state = "down"
3893
      else:
3894
        config_state = "up"
3895

    
3896
      disks = [self._ComputeDiskStatus(instance, None, device)
3897
               for device in instance.disks]
3898

    
3899
      idict = {
3900
        "name": instance.name,
3901
        "config_state": config_state,
3902
        "run_state": remote_state,
3903
        "pnode": instance.primary_node,
3904
        "snodes": instance.secondary_nodes,
3905
        "os": instance.os,
3906
        "memory": instance.memory,
3907
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3908
        "disks": disks,
3909
        "vcpus": instance.vcpus,
3910
        }
3911

    
3912
      result[instance.name] = idict
3913

    
3914
    return result
3915

    
3916

    
3917
class LUSetInstanceParms(LogicalUnit):
3918
  """Modifies an instances's parameters.
3919

3920
  """
3921
  HPATH = "instance-modify"
3922
  HTYPE = constants.HTYPE_INSTANCE
3923
  _OP_REQP = ["instance_name"]
3924

    
3925
  def BuildHooksEnv(self):
3926
    """Build hooks env.
3927

3928
    This runs on the master, primary and secondaries.
3929

3930
    """
3931
    args = dict()
3932
    if self.mem:
3933
      args['memory'] = self.mem
3934
    if self.vcpus:
3935
      args['vcpus'] = self.vcpus
3936
    if self.do_ip or self.do_bridge:
3937
      if self.do_ip:
3938
        ip = self.ip
3939
      else:
3940
        ip = self.instance.nics[0].ip
3941
      if self.bridge:
3942
        bridge = self.bridge
3943
      else:
3944
        bridge = self.instance.nics[0].bridge
3945
      args['nics'] = [(ip, bridge)]
3946
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3947
    nl = [self.sstore.GetMasterNode(),
3948
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3949
    return env, nl, nl
3950

    
3951
  def CheckPrereq(self):
3952
    """Check prerequisites.
3953

3954
    This only checks the instance list against the existing names.
3955

3956
    """
3957
    self.mem = getattr(self.op, "mem", None)
3958
    self.vcpus = getattr(self.op, "vcpus", None)
3959
    self.ip = getattr(self.op, "ip", None)
3960
    self.bridge = getattr(self.op, "bridge", None)
3961
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3962
      raise errors.OpPrereqError("No changes submitted")
3963
    if self.mem is not None:
3964
      try:
3965
        self.mem = int(self.mem)
3966
      except ValueError, err:
3967
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3968
    if self.vcpus is not None:
3969
      try:
3970
        self.vcpus = int(self.vcpus)
3971
      except ValueError, err:
3972
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3973
    if self.ip is not None:
3974
      self.do_ip = True
3975
      if self.ip.lower() == "none":
3976
        self.ip = None
3977
      else:
3978
        if not utils.IsValidIP(self.ip):
3979
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3980
    else:
3981
      self.do_ip = False
3982
    self.do_bridge = (self.bridge is not None)
3983

    
3984
    instance = self.cfg.GetInstanceInfo(
3985
      self.cfg.ExpandInstanceName(self.op.instance_name))
3986
    if instance is None:
3987
      raise errors.OpPrereqError("No such instance name '%s'" %
3988
                                 self.op.instance_name)
3989
    self.op.instance_name = instance.name
3990
    self.instance = instance
3991
    return
3992

    
3993
  def Exec(self, feedback_fn):
3994
    """Modifies an instance.
3995

3996
    All parameters take effect only at the next restart of the instance.
3997
    """
3998
    result = []
3999
    instance = self.instance
4000
    if self.mem:
4001
      instance.memory = self.mem
4002
      result.append(("mem", self.mem))
4003
    if self.vcpus:
4004
      instance.vcpus = self.vcpus
4005
      result.append(("vcpus",  self.vcpus))
4006
    if self.do_ip:
4007
      instance.nics[0].ip = self.ip
4008
      result.append(("ip", self.ip))
4009
    if self.bridge:
4010
      instance.nics[0].bridge = self.bridge
4011
      result.append(("bridge", self.bridge))
4012

    
4013
    self.cfg.AddInstance(instance)
4014

    
4015
    return result
4016

    
4017

    
4018
class LUQueryExports(NoHooksLU):
4019
  """Query the exports list
4020

4021
  """
4022
  _OP_REQP = []
4023

    
4024
  def CheckPrereq(self):
4025
    """Check that the nodelist contains only existing nodes.
4026

4027
    """
4028
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4029

    
4030
  def Exec(self, feedback_fn):
4031
    """Compute the list of all the exported system images.
4032

4033
    Returns:
4034
      a dictionary with the structure node->(export-list)
4035
      where export-list is a list of the instances exported on
4036
      that node.
4037

4038
    """
4039
    return rpc.call_export_list(self.nodes)
4040

    
4041

    
4042
class LUExportInstance(LogicalUnit):
4043
  """Export an instance to an image in the cluster.
4044

4045
  """
4046
  HPATH = "instance-export"
4047
  HTYPE = constants.HTYPE_INSTANCE
4048
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4049

    
4050
  def BuildHooksEnv(self):
4051
    """Build hooks env.
4052

4053
    This will run on the master, primary node and target node.
4054

4055
    """
4056
    env = {
4057
      "EXPORT_NODE": self.op.target_node,
4058
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4059
      }
4060
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4061
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4062
          self.op.target_node]
4063
    return env, nl, nl
4064

    
4065
  def CheckPrereq(self):
4066
    """Check prerequisites.
4067

4068
    This checks that the instance name is a valid one.
4069

4070
    """
4071
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4072
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4073
    if self.instance is None:
4074
      raise errors.OpPrereqError("Instance '%s' not found" %
4075
                                 self.op.instance_name)
4076

    
4077
    # node verification
4078
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4079
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4080

    
4081
    if self.dst_node is None:
4082
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4083
                                 self.op.target_node)
4084
    self.op.target_node = self.dst_node.name
4085

    
4086
  def Exec(self, feedback_fn):
4087
    """Export an instance to an image in the cluster.
4088

4089
    """
4090
    instance = self.instance
4091
    dst_node = self.dst_node
4092
    src_node = instance.primary_node
4093
    # shutdown the instance, unless requested not to do so
4094
    if self.op.shutdown:
4095
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
4096
      self.processor.ChainOpCode(op)
4097

    
4098
    vgname = self.cfg.GetVGName()
4099

    
4100
    snap_disks = []
4101

    
4102
    try:
4103
      for disk in instance.disks:
4104
        if disk.iv_name == "sda":
4105
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4106
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4107

    
4108
          if not new_dev_name:
4109
            logger.Error("could not snapshot block device %s on node %s" %
4110
                         (disk.logical_id[1], src_node))
4111
          else:
4112
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4113
                                      logical_id=(vgname, new_dev_name),
4114
                                      physical_id=(vgname, new_dev_name),
4115
                                      iv_name=disk.iv_name)
4116
            snap_disks.append(new_dev)
4117

    
4118
    finally:
4119
      if self.op.shutdown:
4120
        op = opcodes.OpStartupInstance(instance_name=instance.name,
4121
                                       force=False)
4122
        self.processor.ChainOpCode(op)
4123

    
4124
    # TODO: check for size
4125

    
4126
    for dev in snap_disks:
4127
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4128
                                           instance):
4129
        logger.Error("could not export block device %s from node"
4130
                     " %s to node %s" %
4131
                     (dev.logical_id[1], src_node, dst_node.name))
4132
      if not rpc.call_blockdev_remove(src_node, dev):
4133
        logger.Error("could not remove snapshot block device %s from"
4134
                     " node %s" % (dev.logical_id[1], src_node))
4135

    
4136
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4137
      logger.Error("could not finalize export for instance %s on node %s" %
4138
                   (instance.name, dst_node.name))
4139

    
4140
    nodelist = self.cfg.GetNodeList()
4141
    nodelist.remove(dst_node.name)
4142

    
4143
    # on one-node clusters nodelist will be empty after the removal
4144
    # if we proceed the backup would be removed because OpQueryExports
4145
    # substitutes an empty list with the full cluster node list.
4146
    if nodelist:
4147
      op = opcodes.OpQueryExports(nodes=nodelist)
4148
      exportlist = self.processor.ChainOpCode(op)
4149
      for node in exportlist:
4150
        if instance.name in exportlist[node]:
4151
          if not rpc.call_export_remove(node, instance.name):
4152
            logger.Error("could not remove older export for instance %s"
4153
                         " on node %s" % (instance.name, node))
4154

    
4155

    
4156
class TagsLU(NoHooksLU):
4157
  """Generic tags LU.
4158

4159
  This is an abstract class which is the parent of all the other tags LUs.
4160

4161
  """
4162
  def CheckPrereq(self):
4163
    """Check prerequisites.
4164

4165
    """
4166
    if self.op.kind == constants.TAG_CLUSTER:
4167
      self.target = self.cfg.GetClusterInfo()
4168
    elif self.op.kind == constants.TAG_NODE:
4169
      name = self.cfg.ExpandNodeName(self.op.name)
4170
      if name is None:
4171
        raise errors.OpPrereqError("Invalid node name (%s)" %
4172
                                   (self.op.name,))
4173
      self.op.name = name
4174
      self.target = self.cfg.GetNodeInfo(name)
4175
    elif self.op.kind == constants.TAG_INSTANCE:
4176
      name = self.cfg.ExpandInstanceName(self.op.name)
4177
      if name is None:
4178
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4179
                                   (self.op.name,))
4180
      self.op.name = name
4181
      self.target = self.cfg.GetInstanceInfo(name)
4182
    else:
4183
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4184
                                 str(self.op.kind))
4185

    
4186

    
4187
class LUGetTags(TagsLU):
4188
  """Returns the tags of a given object.
4189

4190
  """
4191
  _OP_REQP = ["kind", "name"]
4192

    
4193
  def Exec(self, feedback_fn):
4194
    """Returns the tag list.
4195

4196
    """
4197
    return self.target.GetTags()
4198

    
4199

    
4200
class LUSearchTags(NoHooksLU):
4201
  """Searches the tags for a given pattern.
4202

4203
  """
4204
  _OP_REQP = ["pattern"]
4205

    
4206
  def CheckPrereq(self):
4207
    """Check prerequisites.
4208

4209
    This checks the pattern passed for validity by compiling it.
4210

4211
    """
4212
    try:
4213
      self.re = re.compile(self.op.pattern)
4214
    except re.error, err:
4215
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4216
                                 (self.op.pattern, err))
4217

    
4218
  def Exec(self, feedback_fn):
4219
    """Returns the tag list.
4220

4221
    """
4222
    cfg = self.cfg
4223
    tgts = [("/cluster", cfg.GetClusterInfo())]
4224
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4225
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4226
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4227
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4228
    results = []
4229
    for path, target in tgts:
4230
      for tag in target.GetTags():
4231
        if self.re.search(tag):
4232
          results.append((path, tag))
4233
    return results
4234

    
4235

    
4236
class LUAddTags(TagsLU):
4237
  """Sets a tag on a given object.
4238

4239
  """
4240
  _OP_REQP = ["kind", "name", "tags"]
4241

    
4242
  def CheckPrereq(self):
4243
    """Check prerequisites.
4244

4245
    This checks the type and length of the tag name and value.
4246

4247
    """
4248
    TagsLU.CheckPrereq(self)
4249
    for tag in self.op.tags:
4250
      objects.TaggableObject.ValidateTag(tag)
4251

    
4252
  def Exec(self, feedback_fn):
4253
    """Sets the tag.
4254

4255
    """
4256
    try:
4257
      for tag in self.op.tags:
4258
        self.target.AddTag(tag)
4259
    except errors.TagError, err:
4260
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4261
    try:
4262
      self.cfg.Update(self.target)
4263
    except errors.ConfigurationError:
4264
      raise errors.OpRetryError("There has been a modification to the"
4265
                                " config file and the operation has been"
4266
                                " aborted. Please retry.")
4267

    
4268

    
4269
class LUDelTags(TagsLU):
4270
  """Delete a list of tags from a given object.
4271

4272
  """
4273
  _OP_REQP = ["kind", "name", "tags"]
4274

    
4275
  def CheckPrereq(self):
4276
    """Check prerequisites.
4277

4278
    This checks that we have the given tag.
4279

4280
    """
4281
    TagsLU.CheckPrereq(self)
4282
    for tag in self.op.tags:
4283
      objects.TaggableObject.ValidateTag(tag)
4284
    del_tags = frozenset(self.op.tags)
4285
    cur_tags = self.target.GetTags()
4286
    if not del_tags <= cur_tags:
4287
      diff_tags = del_tags - cur_tags
4288
      diff_names = ["'%s'" % tag for tag in diff_tags]
4289
      diff_names.sort()
4290
      raise errors.OpPrereqError("Tag(s) %s not found" %
4291
                                 (",".join(diff_names)))
4292

    
4293
  def Exec(self, feedback_fn):
4294
    """Remove the tag from the object.
4295

4296
    """
4297
    for tag in self.op.tags:
4298
      self.target.RemoveTag(tag)
4299
    try:
4300
      self.cfg.Update(self.target)
4301
    except errors.ConfigurationError:
4302
      raise errors.OpRetryError("There has been a modification to the"
4303
                                " config file and the operation has been"
4304
                                " aborted. Please retry.")