Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 0834c866

History | View | Annotate | Download (144 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46
class LogicalUnit(object):
47
  """Logical Unit base class.
48

49
  Subclasses must follow these rules:
50
    - implement CheckPrereq which also fills in the opcode instance
51
      with all the fields (even if as None)
52
    - implement Exec
53
    - implement BuildHooksEnv
54
    - redefine HPATH and HTYPE
55
    - optionally redefine their run requirements (REQ_CLUSTER,
56
      REQ_MASTER); note that all commands require root permissions
57

58
  """
59
  HPATH = None
60
  HTYPE = None
61
  _OP_REQP = []
62
  REQ_CLUSTER = True
63
  REQ_MASTER = True
64

    
65
  def __init__(self, processor, op, cfg, sstore):
66
    """Constructor for LogicalUnit.
67

68
    This needs to be overriden in derived classes in order to check op
69
    validity.
70

71
    """
72
    self.processor = processor
73
    self.op = op
74
    self.cfg = cfg
75
    self.sstore = sstore
76
    for attr_name in self._OP_REQP:
77
      attr_val = getattr(op, attr_name, None)
78
      if attr_val is None:
79
        raise errors.OpPrereqError("Required parameter '%s' missing" %
80
                                   attr_name)
81
    if self.REQ_CLUSTER:
82
      if not cfg.IsCluster():
83
        raise errors.OpPrereqError("Cluster not initialized yet,"
84
                                   " use 'gnt-cluster init' first.")
85
      if self.REQ_MASTER:
86
        master = sstore.GetMasterNode()
87
        if master != utils.HostInfo().name:
88
          raise errors.OpPrereqError("Commands must be run on the master"
89
                                     " node %s" % master)
90

    
91
  def CheckPrereq(self):
92
    """Check prerequisites for this LU.
93

94
    This method should check that the prerequisites for the execution
95
    of this LU are fulfilled. It can do internode communication, but
96
    it should be idempotent - no cluster or system changes are
97
    allowed.
98

99
    The method should raise errors.OpPrereqError in case something is
100
    not fulfilled. Its return value is ignored.
101

102
    This method should also update all the parameters of the opcode to
103
    their canonical form; e.g. a short node name must be fully
104
    expanded after this method has successfully completed (so that
105
    hooks, logging, etc. work correctly).
106

107
    """
108
    raise NotImplementedError
109

    
110
  def Exec(self, feedback_fn):
111
    """Execute the LU.
112

113
    This method should implement the actual work. It should raise
114
    errors.OpExecError for failures that are somewhat dealt with in
115
    code, or expected.
116

117
    """
118
    raise NotImplementedError
119

    
120
  def BuildHooksEnv(self):
121
    """Build hooks environment for this LU.
122

123
    This method should return a three-node tuple consisting of: a dict
124
    containing the environment that will be used for running the
125
    specific hook for this LU, a list of node names on which the hook
126
    should run before the execution, and a list of node names on which
127
    the hook should run after the execution.
128

129
    The keys of the dict must not have 'GANETI_' prefixed as this will
130
    be handled in the hooks runner. Also note additional keys will be
131
    added by the hooks runner. If the LU doesn't define any
132
    environment, an empty dict (and not None) should be returned.
133

134
    As for the node lists, the master should not be included in the
135
    them, as it will be added by the hooks runner in case this LU
136
    requires a cluster to run on (otherwise we don't have a node
137
    list). No nodes should be returned as an empty list (and not
138
    None).
139

140
    Note that if the HPATH for a LU class is None, this function will
141
    not be called.
142

143
    """
144
    raise NotImplementedError
145

    
146

    
147
class NoHooksLU(LogicalUnit):
148
  """Simple LU which runs no hooks.
149

150
  This LU is intended as a parent for other LogicalUnits which will
151
  run no hooks, in order to reduce duplicate code.
152

153
  """
154
  HPATH = None
155
  HTYPE = None
156

    
157
  def BuildHooksEnv(self):
158
    """Build hooks env.
159

160
    This is a no-op, since we don't run hooks.
161

162
    """
163
    return {}, [], []
164

    
165

    
166
def _GetWantedNodes(lu, nodes):
167
  """Returns list of checked and expanded node names.
168

169
  Args:
170
    nodes: List of nodes (strings) or None for all
171

172
  """
173
  if not isinstance(nodes, list):
174
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
175

    
176
  if nodes:
177
    wanted = []
178

    
179
    for name in nodes:
180
      node = lu.cfg.ExpandNodeName(name)
181
      if node is None:
182
        raise errors.OpPrereqError("No such node name '%s'" % name)
183
      wanted.append(node)
184

    
185
  else:
186
    wanted = lu.cfg.GetNodeList()
187
  return utils.NiceSort(wanted)
188

    
189

    
190
def _GetWantedInstances(lu, instances):
191
  """Returns list of checked and expanded instance names.
192

193
  Args:
194
    instances: List of instances (strings) or None for all
195

196
  """
197
  if not isinstance(instances, list):
198
    raise errors.OpPrereqError("Invalid argument type 'instances'")
199

    
200
  if instances:
201
    wanted = []
202

    
203
    for name in instances:
204
      instance = lu.cfg.ExpandInstanceName(name)
205
      if instance is None:
206
        raise errors.OpPrereqError("No such instance name '%s'" % name)
207
      wanted.append(instance)
208

    
209
  else:
210
    wanted = lu.cfg.GetInstanceList()
211
  return utils.NiceSort(wanted)
212

    
213

    
214
def _CheckOutputFields(static, dynamic, selected):
215
  """Checks whether all selected fields are valid.
216

217
  Args:
218
    static: Static fields
219
    dynamic: Dynamic fields
220

221
  """
222
  static_fields = frozenset(static)
223
  dynamic_fields = frozenset(dynamic)
224

    
225
  all_fields = static_fields | dynamic_fields
226

    
227
  if not all_fields.issuperset(selected):
228
    raise errors.OpPrereqError("Unknown output fields selected: %s"
229
                               % ",".join(frozenset(selected).
230
                                          difference(all_fields)))
231

    
232

    
233
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
234
                          memory, vcpus, nics):
235
  """Builds instance related env variables for hooks from single variables.
236

237
  Args:
238
    secondary_nodes: List of secondary nodes as strings
239
  """
240
  env = {
241
    "OP_TARGET": name,
242
    "INSTANCE_NAME": name,
243
    "INSTANCE_PRIMARY": primary_node,
244
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
245
    "INSTANCE_OS_TYPE": os_type,
246
    "INSTANCE_STATUS": status,
247
    "INSTANCE_MEMORY": memory,
248
    "INSTANCE_VCPUS": vcpus,
249
  }
250

    
251
  if nics:
252
    nic_count = len(nics)
253
    for idx, (ip, bridge) in enumerate(nics):
254
      if ip is None:
255
        ip = ""
256
      env["INSTANCE_NIC%d_IP" % idx] = ip
257
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
258
  else:
259
    nic_count = 0
260

    
261
  env["INSTANCE_NIC_COUNT"] = nic_count
262

    
263
  return env
264

    
265

    
266
def _BuildInstanceHookEnvByObject(instance, override=None):
267
  """Builds instance related env variables for hooks from an object.
268

269
  Args:
270
    instance: objects.Instance object of instance
271
    override: dict of values to override
272
  """
273
  args = {
274
    'name': instance.name,
275
    'primary_node': instance.primary_node,
276
    'secondary_nodes': instance.secondary_nodes,
277
    'os_type': instance.os,
278
    'status': instance.os,
279
    'memory': instance.memory,
280
    'vcpus': instance.vcpus,
281
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
282
  }
283
  if override:
284
    args.update(override)
285
  return _BuildInstanceHookEnv(**args)
286

    
287

    
288
def _UpdateEtcHosts(fullnode, ip):
289
  """Ensure a node has a correct entry in /etc/hosts.
290

291
  Args:
292
    fullnode - Fully qualified domain name of host. (str)
293
    ip       - IPv4 address of host (str)
294

295
  """
296
  node = fullnode.split(".", 1)[0]
297

    
298
  f = open('/etc/hosts', 'r+')
299

    
300
  inthere = False
301

    
302
  save_lines = []
303
  add_lines = []
304
  removed = False
305

    
306
  while True:
307
    rawline = f.readline()
308

    
309
    if not rawline:
310
      # End of file
311
      break
312

    
313
    line = rawline.split('\n')[0]
314

    
315
    # Strip off comments
316
    line = line.split('#')[0]
317

    
318
    if not line:
319
      # Entire line was comment, skip
320
      save_lines.append(rawline)
321
      continue
322

    
323
    fields = line.split()
324

    
325
    haveall = True
326
    havesome = False
327
    for spec in [ ip, fullnode, node ]:
328
      if spec not in fields:
329
        haveall = False
330
      if spec in fields:
331
        havesome = True
332

    
333
    if haveall:
334
      inthere = True
335
      save_lines.append(rawline)
336
      continue
337

    
338
    if havesome and not haveall:
339
      # Line (old, or manual?) which is missing some.  Remove.
340
      removed = True
341
      continue
342

    
343
    save_lines.append(rawline)
344

    
345
  if not inthere:
346
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
347

    
348
  if removed:
349
    if add_lines:
350
      save_lines = save_lines + add_lines
351

    
352
    # We removed a line, write a new file and replace old.
353
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
354
    newfile = os.fdopen(fd, 'w')
355
    newfile.write(''.join(save_lines))
356
    newfile.close()
357
    os.rename(tmpname, '/etc/hosts')
358

    
359
  elif add_lines:
360
    # Simply appending a new line will do the trick.
361
    f.seek(0, 2)
362
    for add in add_lines:
363
      f.write(add)
364

    
365
  f.close()
366

    
367

    
368
def _UpdateKnownHosts(fullnode, ip, pubkey):
369
  """Ensure a node has a correct known_hosts entry.
370

371
  Args:
372
    fullnode - Fully qualified domain name of host. (str)
373
    ip       - IPv4 address of host (str)
374
    pubkey   - the public key of the cluster
375

376
  """
377
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
378
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
379
  else:
380
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
381

    
382
  inthere = False
383

    
384
  save_lines = []
385
  add_lines = []
386
  removed = False
387

    
388
  for rawline in f:
389
    logger.Debug('read %s' % (repr(rawline),))
390

    
391
    parts = rawline.rstrip('\r\n').split()
392

    
393
    # Ignore unwanted lines
394
    if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
395
      fields = parts[0].split(',')
396
      key = parts[2]
397

    
398
      haveall = True
399
      havesome = False
400
      for spec in [ ip, fullnode ]:
401
        if spec not in fields:
402
          haveall = False
403
        if spec in fields:
404
          havesome = True
405

    
406
      logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
407
      if haveall and key == pubkey:
408
        inthere = True
409
        save_lines.append(rawline)
410
        logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
411
        continue
412

    
413
      if havesome and (not haveall or key != pubkey):
414
        removed = True
415
        logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
416
        continue
417

    
418
    save_lines.append(rawline)
419

    
420
  if not inthere:
421
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
422
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
423

    
424
  if removed:
425
    save_lines = save_lines + add_lines
426

    
427
    # Write a new file and replace old.
428
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
429
                                   constants.DATA_DIR)
430
    newfile = os.fdopen(fd, 'w')
431
    try:
432
      newfile.write(''.join(save_lines))
433
    finally:
434
      newfile.close()
435
    logger.Debug("Wrote new known_hosts.")
436
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
437

    
438
  elif add_lines:
439
    # Simply appending a new line will do the trick.
440
    f.seek(0, 2)
441
    for add in add_lines:
442
      f.write(add)
443

    
444
  f.close()
445

    
446

    
447
def _HasValidVG(vglist, vgname):
448
  """Checks if the volume group list is valid.
449

450
  A non-None return value means there's an error, and the return value
451
  is the error message.
452

453
  """
454
  vgsize = vglist.get(vgname, None)
455
  if vgsize is None:
456
    return "volume group '%s' missing" % vgname
457
  elif vgsize < 20480:
458
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
459
            (vgname, vgsize))
460
  return None
461

    
462

    
463
def _InitSSHSetup(node):
464
  """Setup the SSH configuration for the cluster.
465

466

467
  This generates a dsa keypair for root, adds the pub key to the
468
  permitted hosts and adds the hostkey to its own known hosts.
469

470
  Args:
471
    node: the name of this host as a fqdn
472

473
  """
474
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
475

    
476
  for name in priv_key, pub_key:
477
    if os.path.exists(name):
478
      utils.CreateBackup(name)
479
    utils.RemoveFile(name)
480

    
481
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
482
                         "-f", priv_key,
483
                         "-q", "-N", ""])
484
  if result.failed:
485
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
486
                             result.output)
487

    
488
  f = open(pub_key, 'r')
489
  try:
490
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
491
  finally:
492
    f.close()
493

    
494

    
495
def _InitGanetiServerSetup(ss):
496
  """Setup the necessary configuration for the initial node daemon.
497

498
  This creates the nodepass file containing the shared password for
499
  the cluster and also generates the SSL certificate.
500

501
  """
502
  # Create pseudo random password
503
  randpass = sha.new(os.urandom(64)).hexdigest()
504
  # and write it into sstore
505
  ss.SetKey(ss.SS_NODED_PASS, randpass)
506

    
507
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
508
                         "-days", str(365*5), "-nodes", "-x509",
509
                         "-keyout", constants.SSL_CERT_FILE,
510
                         "-out", constants.SSL_CERT_FILE, "-batch"])
511
  if result.failed:
512
    raise errors.OpExecError("could not generate server ssl cert, command"
513
                             " %s had exitcode %s and error message %s" %
514
                             (result.cmd, result.exit_code, result.output))
515

    
516
  os.chmod(constants.SSL_CERT_FILE, 0400)
517

    
518
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
519

    
520
  if result.failed:
521
    raise errors.OpExecError("Could not start the node daemon, command %s"
522
                             " had exitcode %s and error %s" %
523
                             (result.cmd, result.exit_code, result.output))
524

    
525

    
526
def _CheckInstanceBridgesExist(instance):
527
  """Check that the brigdes needed by an instance exist.
528

529
  """
530
  # check bridges existance
531
  brlist = [nic.bridge for nic in instance.nics]
532
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
533
    raise errors.OpPrereqError("one or more target bridges %s does not"
534
                               " exist on destination node '%s'" %
535
                               (brlist, instance.primary_node))
536

    
537

    
538
class LUInitCluster(LogicalUnit):
539
  """Initialise the cluster.
540

541
  """
542
  HPATH = "cluster-init"
543
  HTYPE = constants.HTYPE_CLUSTER
544
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
545
              "def_bridge", "master_netdev"]
546
  REQ_CLUSTER = False
547

    
548
  def BuildHooksEnv(self):
549
    """Build hooks env.
550

551
    Notes: Since we don't require a cluster, we must manually add
552
    ourselves in the post-run node list.
553

554
    """
555
    env = {"OP_TARGET": self.op.cluster_name}
556
    return env, [], [self.hostname.name]
557

    
558
  def CheckPrereq(self):
559
    """Verify that the passed name is a valid one.
560

561
    """
562
    if config.ConfigWriter.IsCluster():
563
      raise errors.OpPrereqError("Cluster is already initialised")
564

    
565
    self.hostname = hostname = utils.HostInfo()
566

    
567
    if hostname.ip.startswith("127."):
568
      raise errors.OpPrereqError("This host's IP resolves to the private"
569
                                 " range (%s). Please fix DNS or /etc/hosts." %
570
                                 (hostname.ip,))
571

    
572
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
573

    
574
    if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
575
                         constants.DEFAULT_NODED_PORT):
576
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
577
                                 " to %s,\nbut this ip address does not"
578
                                 " belong to this host."
579
                                 " Aborting." % hostname.ip)
580

    
581
    secondary_ip = getattr(self.op, "secondary_ip", None)
582
    if secondary_ip and not utils.IsValidIP(secondary_ip):
583
      raise errors.OpPrereqError("Invalid secondary ip given")
584
    if (secondary_ip and
585
        secondary_ip != hostname.ip and
586
        (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
587
                           constants.DEFAULT_NODED_PORT))):
588
      raise errors.OpPrereqError("You gave %s as secondary IP,\n"
589
                                 "but it does not belong to this host." %
590
                                 secondary_ip)
591
    self.secondary_ip = secondary_ip
592

    
593
    # checks presence of the volume group given
594
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
595

    
596
    if vgstatus:
597
      raise errors.OpPrereqError("Error: %s" % vgstatus)
598

    
599
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
600
                    self.op.mac_prefix):
601
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
602
                                 self.op.mac_prefix)
603

    
604
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
605
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
606
                                 self.op.hypervisor_type)
607

    
608
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
609
    if result.failed:
610
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
611
                                 (self.op.master_netdev,
612
                                  result.output.strip()))
613

    
614
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
615
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
616
      raise errors.OpPrereqError("Init.d script '%s' missing or not "
617
                                 "executable." % constants.NODE_INITD_SCRIPT)
618

    
619
  def Exec(self, feedback_fn):
620
    """Initialize the cluster.
621

622
    """
623
    clustername = self.clustername
624
    hostname = self.hostname
625

    
626
    # set up the simple store
627
    self.sstore = ss = ssconf.SimpleStore()
628
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
629
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
630
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
631
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
632
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
633

    
634
    # set up the inter-node password and certificate
635
    _InitGanetiServerSetup(ss)
636

    
637
    # start the master ip
638
    rpc.call_node_start_master(hostname.name)
639

    
640
    # set up ssh config and /etc/hosts
641
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
642
    try:
643
      sshline = f.read()
644
    finally:
645
      f.close()
646
    sshkey = sshline.split(" ")[1]
647

    
648
    _UpdateEtcHosts(hostname.name, hostname.ip)
649

    
650
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
651

    
652
    _InitSSHSetup(hostname.name)
653

    
654
    # init of cluster config file
655
    self.cfg = cfgw = config.ConfigWriter()
656
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
657
                    sshkey, self.op.mac_prefix,
658
                    self.op.vg_name, self.op.def_bridge)
659

    
660

    
661
class LUDestroyCluster(NoHooksLU):
662
  """Logical unit for destroying the cluster.
663

664
  """
665
  _OP_REQP = []
666

    
667
  def CheckPrereq(self):
668
    """Check prerequisites.
669

670
    This checks whether the cluster is empty.
671

672
    Any errors are signalled by raising errors.OpPrereqError.
673

674
    """
675
    master = self.sstore.GetMasterNode()
676

    
677
    nodelist = self.cfg.GetNodeList()
678
    if len(nodelist) != 1 or nodelist[0] != master:
679
      raise errors.OpPrereqError("There are still %d node(s) in"
680
                                 " this cluster." % (len(nodelist) - 1))
681
    instancelist = self.cfg.GetInstanceList()
682
    if instancelist:
683
      raise errors.OpPrereqError("There are still %d instance(s) in"
684
                                 " this cluster." % len(instancelist))
685

    
686
  def Exec(self, feedback_fn):
687
    """Destroys the cluster.
688

689
    """
690
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
691
    utils.CreateBackup(priv_key)
692
    utils.CreateBackup(pub_key)
693
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
694

    
695

    
696
class LUVerifyCluster(NoHooksLU):
697
  """Verifies the cluster status.
698

699
  """
700
  _OP_REQP = []
701

    
702
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
703
                  remote_version, feedback_fn):
704
    """Run multiple tests against a node.
705

706
    Test list:
707
      - compares ganeti version
708
      - checks vg existance and size > 20G
709
      - checks config file checksum
710
      - checks ssh to other nodes
711

712
    Args:
713
      node: name of the node to check
714
      file_list: required list of files
715
      local_cksum: dictionary of local files and their checksums
716

717
    """
718
    # compares ganeti version
719
    local_version = constants.PROTOCOL_VERSION
720
    if not remote_version:
721
      feedback_fn(" - ERROR: connection to %s failed" % (node))
722
      return True
723

    
724
    if local_version != remote_version:
725
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
726
                      (local_version, node, remote_version))
727
      return True
728

    
729
    # checks vg existance and size > 20G
730

    
731
    bad = False
732
    if not vglist:
733
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
734
                      (node,))
735
      bad = True
736
    else:
737
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
738
      if vgstatus:
739
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
740
        bad = True
741

    
742
    # checks config file checksum
743
    # checks ssh to any
744

    
745
    if 'filelist' not in node_result:
746
      bad = True
747
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
748
    else:
749
      remote_cksum = node_result['filelist']
750
      for file_name in file_list:
751
        if file_name not in remote_cksum:
752
          bad = True
753
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
754
        elif remote_cksum[file_name] != local_cksum[file_name]:
755
          bad = True
756
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
757

    
758
    if 'nodelist' not in node_result:
759
      bad = True
760
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
761
    else:
762
      if node_result['nodelist']:
763
        bad = True
764
        for node in node_result['nodelist']:
765
          feedback_fn("  - ERROR: communication with node '%s': %s" %
766
                          (node, node_result['nodelist'][node]))
767
    hyp_result = node_result.get('hypervisor', None)
768
    if hyp_result is not None:
769
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
770
    return bad
771

    
772
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
773
    """Verify an instance.
774

775
    This function checks to see if the required block devices are
776
    available on the instance's node.
777

778
    """
779
    bad = False
780

    
781
    instancelist = self.cfg.GetInstanceList()
782
    if not instance in instancelist:
783
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
784
                      (instance, instancelist))
785
      bad = True
786

    
787
    instanceconfig = self.cfg.GetInstanceInfo(instance)
788
    node_current = instanceconfig.primary_node
789

    
790
    node_vol_should = {}
791
    instanceconfig.MapLVsByNode(node_vol_should)
792

    
793
    for node in node_vol_should:
794
      for volume in node_vol_should[node]:
795
        if node not in node_vol_is or volume not in node_vol_is[node]:
796
          feedback_fn("  - ERROR: volume %s missing on node %s" %
797
                          (volume, node))
798
          bad = True
799

    
800
    if not instanceconfig.status == 'down':
801
      if not instance in node_instance[node_current]:
802
        feedback_fn("  - ERROR: instance %s not running on node %s" %
803
                        (instance, node_current))
804
        bad = True
805

    
806
    for node in node_instance:
807
      if (not node == node_current):
808
        if instance in node_instance[node]:
809
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
810
                          (instance, node))
811
          bad = True
812

    
813
    return bad
814

    
815
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
816
    """Verify if there are any unknown volumes in the cluster.
817

818
    The .os, .swap and backup volumes are ignored. All other volumes are
819
    reported as unknown.
820

821
    """
822
    bad = False
823

    
824
    for node in node_vol_is:
825
      for volume in node_vol_is[node]:
826
        if node not in node_vol_should or volume not in node_vol_should[node]:
827
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
828
                      (volume, node))
829
          bad = True
830
    return bad
831

    
832
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
833
    """Verify the list of running instances.
834

835
    This checks what instances are running but unknown to the cluster.
836

837
    """
838
    bad = False
839
    for node in node_instance:
840
      for runninginstance in node_instance[node]:
841
        if runninginstance not in instancelist:
842
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
843
                          (runninginstance, node))
844
          bad = True
845
    return bad
846

    
847
  def CheckPrereq(self):
848
    """Check prerequisites.
849

850
    This has no prerequisites.
851

852
    """
853
    pass
854

    
855
  def Exec(self, feedback_fn):
856
    """Verify integrity of cluster, performing various test on nodes.
857

858
    """
859
    bad = False
860
    feedback_fn("* Verifying global settings")
861
    self.cfg.VerifyConfig()
862

    
863
    vg_name = self.cfg.GetVGName()
864
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
865
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
866
    node_volume = {}
867
    node_instance = {}
868

    
869
    # FIXME: verify OS list
870
    # do local checksums
871
    file_names = list(self.sstore.GetFileList())
872
    file_names.append(constants.SSL_CERT_FILE)
873
    file_names.append(constants.CLUSTER_CONF_FILE)
874
    local_checksums = utils.FingerprintFiles(file_names)
875

    
876
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
877
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
878
    all_instanceinfo = rpc.call_instance_list(nodelist)
879
    all_vglist = rpc.call_vg_list(nodelist)
880
    node_verify_param = {
881
      'filelist': file_names,
882
      'nodelist': nodelist,
883
      'hypervisor': None,
884
      }
885
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
886
    all_rversion = rpc.call_version(nodelist)
887

    
888
    for node in nodelist:
889
      feedback_fn("* Verifying node %s" % node)
890
      result = self._VerifyNode(node, file_names, local_checksums,
891
                                all_vglist[node], all_nvinfo[node],
892
                                all_rversion[node], feedback_fn)
893
      bad = bad or result
894

    
895
      # node_volume
896
      volumeinfo = all_volumeinfo[node]
897

    
898
      if type(volumeinfo) != dict:
899
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
900
        bad = True
901
        continue
902

    
903
      node_volume[node] = volumeinfo
904

    
905
      # node_instance
906
      nodeinstance = all_instanceinfo[node]
907
      if type(nodeinstance) != list:
908
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
909
        bad = True
910
        continue
911

    
912
      node_instance[node] = nodeinstance
913

    
914
    node_vol_should = {}
915

    
916
    for instance in instancelist:
917
      feedback_fn("* Verifying instance %s" % instance)
918
      result =  self._VerifyInstance(instance, node_volume, node_instance,
919
                                     feedback_fn)
920
      bad = bad or result
921

    
922
      inst_config = self.cfg.GetInstanceInfo(instance)
923

    
924
      inst_config.MapLVsByNode(node_vol_should)
925

    
926
    feedback_fn("* Verifying orphan volumes")
927
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
928
                                       feedback_fn)
929
    bad = bad or result
930

    
931
    feedback_fn("* Verifying remaining instances")
932
    result = self._VerifyOrphanInstances(instancelist, node_instance,
933
                                         feedback_fn)
934
    bad = bad or result
935

    
936
    return int(bad)
937

    
938

    
939
class LURenameCluster(LogicalUnit):
940
  """Rename the cluster.
941

942
  """
943
  HPATH = "cluster-rename"
944
  HTYPE = constants.HTYPE_CLUSTER
945
  _OP_REQP = ["name"]
946

    
947
  def BuildHooksEnv(self):
948
    """Build hooks env.
949

950
    """
951
    env = {
952
      "OP_TARGET": self.op.sstore.GetClusterName(),
953
      "NEW_NAME": self.op.name,
954
      }
955
    mn = self.sstore.GetMasterNode()
956
    return env, [mn], [mn]
957

    
958
  def CheckPrereq(self):
959
    """Verify that the passed name is a valid one.
960

961
    """
962
    hostname = utils.HostInfo(self.op.name)
963

    
964
    new_name = hostname.name
965
    self.ip = new_ip = hostname.ip
966
    old_name = self.sstore.GetClusterName()
967
    old_ip = self.sstore.GetMasterIP()
968
    if new_name == old_name and new_ip == old_ip:
969
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
970
                                 " cluster has changed")
971
    if new_ip != old_ip:
972
      result = utils.RunCmd(["fping", "-q", new_ip])
973
      if not result.failed:
974
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
975
                                   " reachable on the network. Aborting." %
976
                                   new_ip)
977

    
978
    self.op.name = new_name
979

    
980
  def Exec(self, feedback_fn):
981
    """Rename the cluster.
982

983
    """
984
    clustername = self.op.name
985
    ip = self.ip
986
    ss = self.sstore
987

    
988
    # shutdown the master IP
989
    master = ss.GetMasterNode()
990
    if not rpc.call_node_stop_master(master):
991
      raise errors.OpExecError("Could not disable the master role")
992

    
993
    try:
994
      # modify the sstore
995
      ss.SetKey(ss.SS_MASTER_IP, ip)
996
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
997

    
998
      # Distribute updated ss config to all nodes
999
      myself = self.cfg.GetNodeInfo(master)
1000
      dist_nodes = self.cfg.GetNodeList()
1001
      if myself.name in dist_nodes:
1002
        dist_nodes.remove(myself.name)
1003

    
1004
      logger.Debug("Copying updated ssconf data to all nodes")
1005
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1006
        fname = ss.KeyToFilename(keyname)
1007
        result = rpc.call_upload_file(dist_nodes, fname)
1008
        for to_node in dist_nodes:
1009
          if not result[to_node]:
1010
            logger.Error("copy of file %s to node %s failed" %
1011
                         (fname, to_node))
1012
    finally:
1013
      if not rpc.call_node_start_master(master):
1014
        logger.Error("Could not re-enable the master role on the master,\n"
1015
                     "please restart manually.")
1016

    
1017

    
1018
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
1019
  """Sleep and poll for an instance's disk to sync.
1020

1021
  """
1022
  if not instance.disks:
1023
    return True
1024

    
1025
  if not oneshot:
1026
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
1027

    
1028
  node = instance.primary_node
1029

    
1030
  for dev in instance.disks:
1031
    cfgw.SetDiskID(dev, node)
1032

    
1033
  retries = 0
1034
  while True:
1035
    max_time = 0
1036
    done = True
1037
    cumul_degraded = False
1038
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1039
    if not rstats:
1040
      logger.ToStderr("Can't get any data from node %s" % node)
1041
      retries += 1
1042
      if retries >= 10:
1043
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1044
                                 " aborting." % node)
1045
      time.sleep(6)
1046
      continue
1047
    retries = 0
1048
    for i in range(len(rstats)):
1049
      mstat = rstats[i]
1050
      if mstat is None:
1051
        logger.ToStderr("Can't compute data for node %s/%s" %
1052
                        (node, instance.disks[i].iv_name))
1053
        continue
1054
      # we ignore the ldisk parameter
1055
      perc_done, est_time, is_degraded, _ = mstat
1056
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1057
      if perc_done is not None:
1058
        done = False
1059
        if est_time is not None:
1060
          rem_time = "%d estimated seconds remaining" % est_time
1061
          max_time = est_time
1062
        else:
1063
          rem_time = "no time estimate"
1064
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
1065
                        (instance.disks[i].iv_name, perc_done, rem_time))
1066
    if done or oneshot:
1067
      break
1068

    
1069
    if unlock:
1070
      utils.Unlock('cmd')
1071
    try:
1072
      time.sleep(min(60, max_time))
1073
    finally:
1074
      if unlock:
1075
        utils.Lock('cmd')
1076

    
1077
  if done:
1078
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1079
  return not cumul_degraded
1080

    
1081

    
1082
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1083
  """Check that mirrors are not degraded.
1084

1085
  The ldisk parameter, if True, will change the test from the
1086
  is_degraded attribute (which represents overall non-ok status for
1087
  the device(s)) to the ldisk (representing the local storage status).
1088

1089
  """
1090
  cfgw.SetDiskID(dev, node)
1091
  if ldisk:
1092
    idx = 6
1093
  else:
1094
    idx = 5
1095

    
1096
  result = True
1097
  if on_primary or dev.AssembleOnSecondary():
1098
    rstats = rpc.call_blockdev_find(node, dev)
1099
    if not rstats:
1100
      logger.ToStderr("Can't get any data from node %s" % node)
1101
      result = False
1102
    else:
1103
      result = result and (not rstats[idx])
1104
  if dev.children:
1105
    for child in dev.children:
1106
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1107

    
1108
  return result
1109

    
1110

    
1111
class LUDiagnoseOS(NoHooksLU):
1112
  """Logical unit for OS diagnose/query.
1113

1114
  """
1115
  _OP_REQP = []
1116

    
1117
  def CheckPrereq(self):
1118
    """Check prerequisites.
1119

1120
    This always succeeds, since this is a pure query LU.
1121

1122
    """
1123
    return
1124

    
1125
  def Exec(self, feedback_fn):
1126
    """Compute the list of OSes.
1127

1128
    """
1129
    node_list = self.cfg.GetNodeList()
1130
    node_data = rpc.call_os_diagnose(node_list)
1131
    if node_data == False:
1132
      raise errors.OpExecError("Can't gather the list of OSes")
1133
    return node_data
1134

    
1135

    
1136
class LURemoveNode(LogicalUnit):
1137
  """Logical unit for removing a node.
1138

1139
  """
1140
  HPATH = "node-remove"
1141
  HTYPE = constants.HTYPE_NODE
1142
  _OP_REQP = ["node_name"]
1143

    
1144
  def BuildHooksEnv(self):
1145
    """Build hooks env.
1146

1147
    This doesn't run on the target node in the pre phase as a failed
1148
    node would not allows itself to run.
1149

1150
    """
1151
    env = {
1152
      "OP_TARGET": self.op.node_name,
1153
      "NODE_NAME": self.op.node_name,
1154
      }
1155
    all_nodes = self.cfg.GetNodeList()
1156
    all_nodes.remove(self.op.node_name)
1157
    return env, all_nodes, all_nodes
1158

    
1159
  def CheckPrereq(self):
1160
    """Check prerequisites.
1161

1162
    This checks:
1163
     - the node exists in the configuration
1164
     - it does not have primary or secondary instances
1165
     - it's not the master
1166

1167
    Any errors are signalled by raising errors.OpPrereqError.
1168

1169
    """
1170
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1171
    if node is None:
1172
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1173

    
1174
    instance_list = self.cfg.GetInstanceList()
1175

    
1176
    masternode = self.sstore.GetMasterNode()
1177
    if node.name == masternode:
1178
      raise errors.OpPrereqError("Node is the master node,"
1179
                                 " you need to failover first.")
1180

    
1181
    for instance_name in instance_list:
1182
      instance = self.cfg.GetInstanceInfo(instance_name)
1183
      if node.name == instance.primary_node:
1184
        raise errors.OpPrereqError("Instance %s still running on the node,"
1185
                                   " please remove first." % instance_name)
1186
      if node.name in instance.secondary_nodes:
1187
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1188
                                   " please remove first." % instance_name)
1189
    self.op.node_name = node.name
1190
    self.node = node
1191

    
1192
  def Exec(self, feedback_fn):
1193
    """Removes the node from the cluster.
1194

1195
    """
1196
    node = self.node
1197
    logger.Info("stopping the node daemon and removing configs from node %s" %
1198
                node.name)
1199

    
1200
    rpc.call_node_leave_cluster(node.name)
1201

    
1202
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1203

    
1204
    logger.Info("Removing node %s from config" % node.name)
1205

    
1206
    self.cfg.RemoveNode(node.name)
1207

    
1208

    
1209
class LUQueryNodes(NoHooksLU):
1210
  """Logical unit for querying nodes.
1211

1212
  """
1213
  _OP_REQP = ["output_fields", "names"]
1214

    
1215
  def CheckPrereq(self):
1216
    """Check prerequisites.
1217

1218
    This checks that the fields required are valid output fields.
1219

1220
    """
1221
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1222
                                     "mtotal", "mnode", "mfree",
1223
                                     "bootid"])
1224

    
1225
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1226
                               "pinst_list", "sinst_list",
1227
                               "pip", "sip"],
1228
                       dynamic=self.dynamic_fields,
1229
                       selected=self.op.output_fields)
1230

    
1231
    self.wanted = _GetWantedNodes(self, self.op.names)
1232

    
1233
  def Exec(self, feedback_fn):
1234
    """Computes the list of nodes and their attributes.
1235

1236
    """
1237
    nodenames = self.wanted
1238
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1239

    
1240
    # begin data gathering
1241

    
1242
    if self.dynamic_fields.intersection(self.op.output_fields):
1243
      live_data = {}
1244
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1245
      for name in nodenames:
1246
        nodeinfo = node_data.get(name, None)
1247
        if nodeinfo:
1248
          live_data[name] = {
1249
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1250
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1251
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1252
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1253
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1254
            "bootid": nodeinfo['bootid'],
1255
            }
1256
        else:
1257
          live_data[name] = {}
1258
    else:
1259
      live_data = dict.fromkeys(nodenames, {})
1260

    
1261
    node_to_primary = dict([(name, set()) for name in nodenames])
1262
    node_to_secondary = dict([(name, set()) for name in nodenames])
1263

    
1264
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1265
                             "sinst_cnt", "sinst_list"))
1266
    if inst_fields & frozenset(self.op.output_fields):
1267
      instancelist = self.cfg.GetInstanceList()
1268

    
1269
      for instance_name in instancelist:
1270
        inst = self.cfg.GetInstanceInfo(instance_name)
1271
        if inst.primary_node in node_to_primary:
1272
          node_to_primary[inst.primary_node].add(inst.name)
1273
        for secnode in inst.secondary_nodes:
1274
          if secnode in node_to_secondary:
1275
            node_to_secondary[secnode].add(inst.name)
1276

    
1277
    # end data gathering
1278

    
1279
    output = []
1280
    for node in nodelist:
1281
      node_output = []
1282
      for field in self.op.output_fields:
1283
        if field == "name":
1284
          val = node.name
1285
        elif field == "pinst_list":
1286
          val = list(node_to_primary[node.name])
1287
        elif field == "sinst_list":
1288
          val = list(node_to_secondary[node.name])
1289
        elif field == "pinst_cnt":
1290
          val = len(node_to_primary[node.name])
1291
        elif field == "sinst_cnt":
1292
          val = len(node_to_secondary[node.name])
1293
        elif field == "pip":
1294
          val = node.primary_ip
1295
        elif field == "sip":
1296
          val = node.secondary_ip
1297
        elif field in self.dynamic_fields:
1298
          val = live_data[node.name].get(field, None)
1299
        else:
1300
          raise errors.ParameterError(field)
1301
        node_output.append(val)
1302
      output.append(node_output)
1303

    
1304
    return output
1305

    
1306

    
1307
class LUQueryNodeVolumes(NoHooksLU):
1308
  """Logical unit for getting volumes on node(s).
1309

1310
  """
1311
  _OP_REQP = ["nodes", "output_fields"]
1312

    
1313
  def CheckPrereq(self):
1314
    """Check prerequisites.
1315

1316
    This checks that the fields required are valid output fields.
1317

1318
    """
1319
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1320

    
1321
    _CheckOutputFields(static=["node"],
1322
                       dynamic=["phys", "vg", "name", "size", "instance"],
1323
                       selected=self.op.output_fields)
1324

    
1325

    
1326
  def Exec(self, feedback_fn):
1327
    """Computes the list of nodes and their attributes.
1328

1329
    """
1330
    nodenames = self.nodes
1331
    volumes = rpc.call_node_volumes(nodenames)
1332

    
1333
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1334
             in self.cfg.GetInstanceList()]
1335

    
1336
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1337

    
1338
    output = []
1339
    for node in nodenames:
1340
      if node not in volumes or not volumes[node]:
1341
        continue
1342

    
1343
      node_vols = volumes[node][:]
1344
      node_vols.sort(key=lambda vol: vol['dev'])
1345

    
1346
      for vol in node_vols:
1347
        node_output = []
1348
        for field in self.op.output_fields:
1349
          if field == "node":
1350
            val = node
1351
          elif field == "phys":
1352
            val = vol['dev']
1353
          elif field == "vg":
1354
            val = vol['vg']
1355
          elif field == "name":
1356
            val = vol['name']
1357
          elif field == "size":
1358
            val = int(float(vol['size']))
1359
          elif field == "instance":
1360
            for inst in ilist:
1361
              if node not in lv_by_node[inst]:
1362
                continue
1363
              if vol['name'] in lv_by_node[inst][node]:
1364
                val = inst.name
1365
                break
1366
            else:
1367
              val = '-'
1368
          else:
1369
            raise errors.ParameterError(field)
1370
          node_output.append(str(val))
1371

    
1372
        output.append(node_output)
1373

    
1374
    return output
1375

    
1376

    
1377
class LUAddNode(LogicalUnit):
1378
  """Logical unit for adding node to the cluster.
1379

1380
  """
1381
  HPATH = "node-add"
1382
  HTYPE = constants.HTYPE_NODE
1383
  _OP_REQP = ["node_name"]
1384

    
1385
  def BuildHooksEnv(self):
1386
    """Build hooks env.
1387

1388
    This will run on all nodes before, and on all nodes + the new node after.
1389

1390
    """
1391
    env = {
1392
      "OP_TARGET": self.op.node_name,
1393
      "NODE_NAME": self.op.node_name,
1394
      "NODE_PIP": self.op.primary_ip,
1395
      "NODE_SIP": self.op.secondary_ip,
1396
      }
1397
    nodes_0 = self.cfg.GetNodeList()
1398
    nodes_1 = nodes_0 + [self.op.node_name, ]
1399
    return env, nodes_0, nodes_1
1400

    
1401
  def CheckPrereq(self):
1402
    """Check prerequisites.
1403

1404
    This checks:
1405
     - the new node is not already in the config
1406
     - it is resolvable
1407
     - its parameters (single/dual homed) matches the cluster
1408

1409
    Any errors are signalled by raising errors.OpPrereqError.
1410

1411
    """
1412
    node_name = self.op.node_name
1413
    cfg = self.cfg
1414

    
1415
    dns_data = utils.HostInfo(node_name)
1416

    
1417
    node = dns_data.name
1418
    primary_ip = self.op.primary_ip = dns_data.ip
1419
    secondary_ip = getattr(self.op, "secondary_ip", None)
1420
    if secondary_ip is None:
1421
      secondary_ip = primary_ip
1422
    if not utils.IsValidIP(secondary_ip):
1423
      raise errors.OpPrereqError("Invalid secondary IP given")
1424
    self.op.secondary_ip = secondary_ip
1425
    node_list = cfg.GetNodeList()
1426
    if node in node_list:
1427
      raise errors.OpPrereqError("Node %s is already in the configuration"
1428
                                 % node)
1429

    
1430
    for existing_node_name in node_list:
1431
      existing_node = cfg.GetNodeInfo(existing_node_name)
1432
      if (existing_node.primary_ip == primary_ip or
1433
          existing_node.secondary_ip == primary_ip or
1434
          existing_node.primary_ip == secondary_ip or
1435
          existing_node.secondary_ip == secondary_ip):
1436
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1437
                                   " existing node %s" % existing_node.name)
1438

    
1439
    # check that the type of the node (single versus dual homed) is the
1440
    # same as for the master
1441
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1442
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1443
    newbie_singlehomed = secondary_ip == primary_ip
1444
    if master_singlehomed != newbie_singlehomed:
1445
      if master_singlehomed:
1446
        raise errors.OpPrereqError("The master has no private ip but the"
1447
                                   " new node has one")
1448
      else:
1449
        raise errors.OpPrereqError("The master has a private ip but the"
1450
                                   " new node doesn't have one")
1451

    
1452
    # checks reachablity
1453
    if not utils.TcpPing(utils.HostInfo().name,
1454
                         primary_ip,
1455
                         constants.DEFAULT_NODED_PORT):
1456
      raise errors.OpPrereqError("Node not reachable by ping")
1457

    
1458
    if not newbie_singlehomed:
1459
      # check reachability from my secondary ip to newbie's secondary ip
1460
      if not utils.TcpPing(myself.secondary_ip,
1461
                           secondary_ip,
1462
                           constants.DEFAULT_NODED_PORT):
1463
        raise errors.OpPrereqError(
1464
          "Node secondary ip not reachable by TCP based ping to noded port")
1465

    
1466
    self.new_node = objects.Node(name=node,
1467
                                 primary_ip=primary_ip,
1468
                                 secondary_ip=secondary_ip)
1469

    
1470
  def Exec(self, feedback_fn):
1471
    """Adds the new node to the cluster.
1472

1473
    """
1474
    new_node = self.new_node
1475
    node = new_node.name
1476

    
1477
    # set up inter-node password and certificate and restarts the node daemon
1478
    gntpass = self.sstore.GetNodeDaemonPassword()
1479
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1480
      raise errors.OpExecError("ganeti password corruption detected")
1481
    f = open(constants.SSL_CERT_FILE)
1482
    try:
1483
      gntpem = f.read(8192)
1484
    finally:
1485
      f.close()
1486
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1487
    # so we use this to detect an invalid certificate; as long as the
1488
    # cert doesn't contain this, the here-document will be correctly
1489
    # parsed by the shell sequence below
1490
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1491
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1492
    if not gntpem.endswith("\n"):
1493
      raise errors.OpExecError("PEM must end with newline")
1494
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1495

    
1496
    # and then connect with ssh to set password and start ganeti-noded
1497
    # note that all the below variables are sanitized at this point,
1498
    # either by being constants or by the checks above
1499
    ss = self.sstore
1500
    mycommand = ("umask 077 && "
1501
                 "echo '%s' > '%s' && "
1502
                 "cat > '%s' << '!EOF.' && \n"
1503
                 "%s!EOF.\n%s restart" %
1504
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1505
                  constants.SSL_CERT_FILE, gntpem,
1506
                  constants.NODE_INITD_SCRIPT))
1507

    
1508
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1509
    if result.failed:
1510
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1511
                               " output: %s" %
1512
                               (node, result.fail_reason, result.output))
1513

    
1514
    # check connectivity
1515
    time.sleep(4)
1516

    
1517
    result = rpc.call_version([node])[node]
1518
    if result:
1519
      if constants.PROTOCOL_VERSION == result:
1520
        logger.Info("communication to node %s fine, sw version %s match" %
1521
                    (node, result))
1522
      else:
1523
        raise errors.OpExecError("Version mismatch master version %s,"
1524
                                 " node version %s" %
1525
                                 (constants.PROTOCOL_VERSION, result))
1526
    else:
1527
      raise errors.OpExecError("Cannot get version from the new node")
1528

    
1529
    # setup ssh on node
1530
    logger.Info("copy ssh key to node %s" % node)
1531
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1532
    keyarray = []
1533
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1534
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1535
                priv_key, pub_key]
1536

    
1537
    for i in keyfiles:
1538
      f = open(i, 'r')
1539
      try:
1540
        keyarray.append(f.read())
1541
      finally:
1542
        f.close()
1543

    
1544
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1545
                               keyarray[3], keyarray[4], keyarray[5])
1546

    
1547
    if not result:
1548
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1549

    
1550
    # Add node to our /etc/hosts, and add key to known_hosts
1551
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1552
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1553
                      self.cfg.GetHostKey())
1554

    
1555
    if new_node.secondary_ip != new_node.primary_ip:
1556
      if not rpc.call_node_tcp_ping(new_node.name,
1557
                                    constants.LOCALHOST_IP_ADDRESS,
1558
                                    new_node.secondary_ip,
1559
                                    constants.DEFAULT_NODED_PORT,
1560
                                    10, False):
1561
        raise errors.OpExecError("Node claims it doesn't have the"
1562
                                 " secondary ip you gave (%s).\n"
1563
                                 "Please fix and re-run this command." %
1564
                                 new_node.secondary_ip)
1565

    
1566
    success, msg = ssh.VerifyNodeHostname(node)
1567
    if not success:
1568
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1569
                               " than the one the resolver gives: %s.\n"
1570
                               "Please fix and re-run this command." %
1571
                               (node, msg))
1572

    
1573
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1574
    # including the node just added
1575
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1576
    dist_nodes = self.cfg.GetNodeList() + [node]
1577
    if myself.name in dist_nodes:
1578
      dist_nodes.remove(myself.name)
1579

    
1580
    logger.Debug("Copying hosts and known_hosts to all nodes")
1581
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1582
      result = rpc.call_upload_file(dist_nodes, fname)
1583
      for to_node in dist_nodes:
1584
        if not result[to_node]:
1585
          logger.Error("copy of file %s to node %s failed" %
1586
                       (fname, to_node))
1587

    
1588
    to_copy = ss.GetFileList()
1589
    for fname in to_copy:
1590
      if not ssh.CopyFileToNode(node, fname):
1591
        logger.Error("could not copy file %s to node %s" % (fname, node))
1592

    
1593
    logger.Info("adding node %s to cluster.conf" % node)
1594
    self.cfg.AddNode(new_node)
1595

    
1596

    
1597
class LUMasterFailover(LogicalUnit):
1598
  """Failover the master node to the current node.
1599

1600
  This is a special LU in that it must run on a non-master node.
1601

1602
  """
1603
  HPATH = "master-failover"
1604
  HTYPE = constants.HTYPE_CLUSTER
1605
  REQ_MASTER = False
1606
  _OP_REQP = []
1607

    
1608
  def BuildHooksEnv(self):
1609
    """Build hooks env.
1610

1611
    This will run on the new master only in the pre phase, and on all
1612
    the nodes in the post phase.
1613

1614
    """
1615
    env = {
1616
      "OP_TARGET": self.new_master,
1617
      "NEW_MASTER": self.new_master,
1618
      "OLD_MASTER": self.old_master,
1619
      }
1620
    return env, [self.new_master], self.cfg.GetNodeList()
1621

    
1622
  def CheckPrereq(self):
1623
    """Check prerequisites.
1624

1625
    This checks that we are not already the master.
1626

1627
    """
1628
    self.new_master = utils.HostInfo().name
1629
    self.old_master = self.sstore.GetMasterNode()
1630

    
1631
    if self.old_master == self.new_master:
1632
      raise errors.OpPrereqError("This commands must be run on the node"
1633
                                 " where you want the new master to be.\n"
1634
                                 "%s is already the master" %
1635
                                 self.old_master)
1636

    
1637
  def Exec(self, feedback_fn):
1638
    """Failover the master node.
1639

1640
    This command, when run on a non-master node, will cause the current
1641
    master to cease being master, and the non-master to become new
1642
    master.
1643

1644
    """
1645
    #TODO: do not rely on gethostname returning the FQDN
1646
    logger.Info("setting master to %s, old master: %s" %
1647
                (self.new_master, self.old_master))
1648

    
1649
    if not rpc.call_node_stop_master(self.old_master):
1650
      logger.Error("could disable the master role on the old master"
1651
                   " %s, please disable manually" % self.old_master)
1652

    
1653
    ss = self.sstore
1654
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1655
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1656
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1657
      logger.Error("could not distribute the new simple store master file"
1658
                   " to the other nodes, please check.")
1659

    
1660
    if not rpc.call_node_start_master(self.new_master):
1661
      logger.Error("could not start the master role on the new master"
1662
                   " %s, please check" % self.new_master)
1663
      feedback_fn("Error in activating the master IP on the new master,\n"
1664
                  "please fix manually.")
1665

    
1666

    
1667

    
1668
class LUQueryClusterInfo(NoHooksLU):
1669
  """Query cluster configuration.
1670

1671
  """
1672
  _OP_REQP = []
1673
  REQ_MASTER = False
1674

    
1675
  def CheckPrereq(self):
1676
    """No prerequsites needed for this LU.
1677

1678
    """
1679
    pass
1680

    
1681
  def Exec(self, feedback_fn):
1682
    """Return cluster config.
1683

1684
    """
1685
    result = {
1686
      "name": self.sstore.GetClusterName(),
1687
      "software_version": constants.RELEASE_VERSION,
1688
      "protocol_version": constants.PROTOCOL_VERSION,
1689
      "config_version": constants.CONFIG_VERSION,
1690
      "os_api_version": constants.OS_API_VERSION,
1691
      "export_version": constants.EXPORT_VERSION,
1692
      "master": self.sstore.GetMasterNode(),
1693
      "architecture": (platform.architecture()[0], platform.machine()),
1694
      }
1695

    
1696
    return result
1697

    
1698

    
1699
class LUClusterCopyFile(NoHooksLU):
1700
  """Copy file to cluster.
1701

1702
  """
1703
  _OP_REQP = ["nodes", "filename"]
1704

    
1705
  def CheckPrereq(self):
1706
    """Check prerequisites.
1707

1708
    It should check that the named file exists and that the given list
1709
    of nodes is valid.
1710

1711
    """
1712
    if not os.path.exists(self.op.filename):
1713
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1714

    
1715
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1716

    
1717
  def Exec(self, feedback_fn):
1718
    """Copy a file from master to some nodes.
1719

1720
    Args:
1721
      opts - class with options as members
1722
      args - list containing a single element, the file name
1723
    Opts used:
1724
      nodes - list containing the name of target nodes; if empty, all nodes
1725

1726
    """
1727
    filename = self.op.filename
1728

    
1729
    myname = utils.HostInfo().name
1730

    
1731
    for node in self.nodes:
1732
      if node == myname:
1733
        continue
1734
      if not ssh.CopyFileToNode(node, filename):
1735
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1736

    
1737

    
1738
class LUDumpClusterConfig(NoHooksLU):
1739
  """Return a text-representation of the cluster-config.
1740

1741
  """
1742
  _OP_REQP = []
1743

    
1744
  def CheckPrereq(self):
1745
    """No prerequisites.
1746

1747
    """
1748
    pass
1749

    
1750
  def Exec(self, feedback_fn):
1751
    """Dump a representation of the cluster config to the standard output.
1752

1753
    """
1754
    return self.cfg.DumpConfig()
1755

    
1756

    
1757
class LURunClusterCommand(NoHooksLU):
1758
  """Run a command on some nodes.
1759

1760
  """
1761
  _OP_REQP = ["command", "nodes"]
1762

    
1763
  def CheckPrereq(self):
1764
    """Check prerequisites.
1765

1766
    It checks that the given list of nodes is valid.
1767

1768
    """
1769
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1770

    
1771
  def Exec(self, feedback_fn):
1772
    """Run a command on some nodes.
1773

1774
    """
1775
    data = []
1776
    for node in self.nodes:
1777
      result = ssh.SSHCall(node, "root", self.op.command)
1778
      data.append((node, result.output, result.exit_code))
1779

    
1780
    return data
1781

    
1782

    
1783
class LUActivateInstanceDisks(NoHooksLU):
1784
  """Bring up an instance's disks.
1785

1786
  """
1787
  _OP_REQP = ["instance_name"]
1788

    
1789
  def CheckPrereq(self):
1790
    """Check prerequisites.
1791

1792
    This checks that the instance is in the cluster.
1793

1794
    """
1795
    instance = self.cfg.GetInstanceInfo(
1796
      self.cfg.ExpandInstanceName(self.op.instance_name))
1797
    if instance is None:
1798
      raise errors.OpPrereqError("Instance '%s' not known" %
1799
                                 self.op.instance_name)
1800
    self.instance = instance
1801

    
1802

    
1803
  def Exec(self, feedback_fn):
1804
    """Activate the disks.
1805

1806
    """
1807
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1808
    if not disks_ok:
1809
      raise errors.OpExecError("Cannot activate block devices")
1810

    
1811
    return disks_info
1812

    
1813

    
1814
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1815
  """Prepare the block devices for an instance.
1816

1817
  This sets up the block devices on all nodes.
1818

1819
  Args:
1820
    instance: a ganeti.objects.Instance object
1821
    ignore_secondaries: if true, errors on secondary nodes won't result
1822
                        in an error return from the function
1823

1824
  Returns:
1825
    false if the operation failed
1826
    list of (host, instance_visible_name, node_visible_name) if the operation
1827
         suceeded with the mapping from node devices to instance devices
1828
  """
1829
  device_info = []
1830
  disks_ok = True
1831
  for inst_disk in instance.disks:
1832
    master_result = None
1833
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1834
      cfg.SetDiskID(node_disk, node)
1835
      is_primary = node == instance.primary_node
1836
      result = rpc.call_blockdev_assemble(node, node_disk,
1837
                                          instance.name, is_primary)
1838
      if not result:
1839
        logger.Error("could not prepare block device %s on node %s (is_pri"
1840
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1841
        if is_primary or not ignore_secondaries:
1842
          disks_ok = False
1843
      if is_primary:
1844
        master_result = result
1845
    device_info.append((instance.primary_node, inst_disk.iv_name,
1846
                        master_result))
1847

    
1848
  # leave the disks configured for the primary node
1849
  # this is a workaround that would be fixed better by
1850
  # improving the logical/physical id handling
1851
  for disk in instance.disks:
1852
    cfg.SetDiskID(disk, instance.primary_node)
1853

    
1854
  return disks_ok, device_info
1855

    
1856

    
1857
def _StartInstanceDisks(cfg, instance, force):
1858
  """Start the disks of an instance.
1859

1860
  """
1861
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1862
                                           ignore_secondaries=force)
1863
  if not disks_ok:
1864
    _ShutdownInstanceDisks(instance, cfg)
1865
    if force is not None and not force:
1866
      logger.Error("If the message above refers to a secondary node,"
1867
                   " you can retry the operation using '--force'.")
1868
    raise errors.OpExecError("Disk consistency error")
1869

    
1870

    
1871
class LUDeactivateInstanceDisks(NoHooksLU):
1872
  """Shutdown an instance's disks.
1873

1874
  """
1875
  _OP_REQP = ["instance_name"]
1876

    
1877
  def CheckPrereq(self):
1878
    """Check prerequisites.
1879

1880
    This checks that the instance is in the cluster.
1881

1882
    """
1883
    instance = self.cfg.GetInstanceInfo(
1884
      self.cfg.ExpandInstanceName(self.op.instance_name))
1885
    if instance is None:
1886
      raise errors.OpPrereqError("Instance '%s' not known" %
1887
                                 self.op.instance_name)
1888
    self.instance = instance
1889

    
1890
  def Exec(self, feedback_fn):
1891
    """Deactivate the disks
1892

1893
    """
1894
    instance = self.instance
1895
    ins_l = rpc.call_instance_list([instance.primary_node])
1896
    ins_l = ins_l[instance.primary_node]
1897
    if not type(ins_l) is list:
1898
      raise errors.OpExecError("Can't contact node '%s'" %
1899
                               instance.primary_node)
1900

    
1901
    if self.instance.name in ins_l:
1902
      raise errors.OpExecError("Instance is running, can't shutdown"
1903
                               " block devices.")
1904

    
1905
    _ShutdownInstanceDisks(instance, self.cfg)
1906

    
1907

    
1908
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1909
  """Shutdown block devices of an instance.
1910

1911
  This does the shutdown on all nodes of the instance.
1912

1913
  If the ignore_primary is false, errors on the primary node are
1914
  ignored.
1915

1916
  """
1917
  result = True
1918
  for disk in instance.disks:
1919
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1920
      cfg.SetDiskID(top_disk, node)
1921
      if not rpc.call_blockdev_shutdown(node, top_disk):
1922
        logger.Error("could not shutdown block device %s on node %s" %
1923
                     (disk.iv_name, node))
1924
        if not ignore_primary or node != instance.primary_node:
1925
          result = False
1926
  return result
1927

    
1928

    
1929
class LUStartupInstance(LogicalUnit):
1930
  """Starts an instance.
1931

1932
  """
1933
  HPATH = "instance-start"
1934
  HTYPE = constants.HTYPE_INSTANCE
1935
  _OP_REQP = ["instance_name", "force"]
1936

    
1937
  def BuildHooksEnv(self):
1938
    """Build hooks env.
1939

1940
    This runs on master, primary and secondary nodes of the instance.
1941

1942
    """
1943
    env = {
1944
      "FORCE": self.op.force,
1945
      }
1946
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1947
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1948
          list(self.instance.secondary_nodes))
1949
    return env, nl, nl
1950

    
1951
  def CheckPrereq(self):
1952
    """Check prerequisites.
1953

1954
    This checks that the instance is in the cluster.
1955

1956
    """
1957
    instance = self.cfg.GetInstanceInfo(
1958
      self.cfg.ExpandInstanceName(self.op.instance_name))
1959
    if instance is None:
1960
      raise errors.OpPrereqError("Instance '%s' not known" %
1961
                                 self.op.instance_name)
1962

    
1963
    # check bridges existance
1964
    _CheckInstanceBridgesExist(instance)
1965

    
1966
    self.instance = instance
1967
    self.op.instance_name = instance.name
1968

    
1969
  def Exec(self, feedback_fn):
1970
    """Start the instance.
1971

1972
    """
1973
    instance = self.instance
1974
    force = self.op.force
1975
    extra_args = getattr(self.op, "extra_args", "")
1976

    
1977
    node_current = instance.primary_node
1978

    
1979
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1980
    if not nodeinfo:
1981
      raise errors.OpExecError("Could not contact node %s for infos" %
1982
                               (node_current))
1983

    
1984
    freememory = nodeinfo[node_current]['memory_free']
1985
    memory = instance.memory
1986
    if memory > freememory:
1987
      raise errors.OpExecError("Not enough memory to start instance"
1988
                               " %s on node %s"
1989
                               " needed %s MiB, available %s MiB" %
1990
                               (instance.name, node_current, memory,
1991
                                freememory))
1992

    
1993
    _StartInstanceDisks(self.cfg, instance, force)
1994

    
1995
    if not rpc.call_instance_start(node_current, instance, extra_args):
1996
      _ShutdownInstanceDisks(instance, self.cfg)
1997
      raise errors.OpExecError("Could not start instance")
1998

    
1999
    self.cfg.MarkInstanceUp(instance.name)
2000

    
2001

    
2002
class LURebootInstance(LogicalUnit):
2003
  """Reboot an instance.
2004

2005
  """
2006
  HPATH = "instance-reboot"
2007
  HTYPE = constants.HTYPE_INSTANCE
2008
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2009

    
2010
  def BuildHooksEnv(self):
2011
    """Build hooks env.
2012

2013
    This runs on master, primary and secondary nodes of the instance.
2014

2015
    """
2016
    env = {
2017
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2018
      }
2019
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2020
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2021
          list(self.instance.secondary_nodes))
2022
    return env, nl, nl
2023

    
2024
  def CheckPrereq(self):
2025
    """Check prerequisites.
2026

2027
    This checks that the instance is in the cluster.
2028

2029
    """
2030
    instance = self.cfg.GetInstanceInfo(
2031
      self.cfg.ExpandInstanceName(self.op.instance_name))
2032
    if instance is None:
2033
      raise errors.OpPrereqError("Instance '%s' not known" %
2034
                                 self.op.instance_name)
2035

    
2036
    # check bridges existance
2037
    _CheckInstanceBridgesExist(instance)
2038

    
2039
    self.instance = instance
2040
    self.op.instance_name = instance.name
2041

    
2042
  def Exec(self, feedback_fn):
2043
    """Reboot the instance.
2044

2045
    """
2046
    instance = self.instance
2047
    ignore_secondaries = self.op.ignore_secondaries
2048
    reboot_type = self.op.reboot_type
2049
    extra_args = getattr(self.op, "extra_args", "")
2050

    
2051
    node_current = instance.primary_node
2052

    
2053
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2054
                           constants.INSTANCE_REBOOT_HARD,
2055
                           constants.INSTANCE_REBOOT_FULL]:
2056
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2057
                                  (constants.INSTANCE_REBOOT_SOFT,
2058
                                   constants.INSTANCE_REBOOT_HARD,
2059
                                   constants.INSTANCE_REBOOT_FULL))
2060

    
2061
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2062
                       constants.INSTANCE_REBOOT_HARD]:
2063
      if not rpc.call_instance_reboot(node_current, instance,
2064
                                      reboot_type, extra_args):
2065
        raise errors.OpExecError("Could not reboot instance")
2066
    else:
2067
      if not rpc.call_instance_shutdown(node_current, instance):
2068
        raise errors.OpExecError("could not shutdown instance for full reboot")
2069
      _ShutdownInstanceDisks(instance, self.cfg)
2070
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2071
      if not rpc.call_instance_start(node_current, instance, extra_args):
2072
        _ShutdownInstanceDisks(instance, self.cfg)
2073
        raise errors.OpExecError("Could not start instance for full reboot")
2074

    
2075
    self.cfg.MarkInstanceUp(instance.name)
2076

    
2077

    
2078
class LUShutdownInstance(LogicalUnit):
2079
  """Shutdown an instance.
2080

2081
  """
2082
  HPATH = "instance-stop"
2083
  HTYPE = constants.HTYPE_INSTANCE
2084
  _OP_REQP = ["instance_name"]
2085

    
2086
  def BuildHooksEnv(self):
2087
    """Build hooks env.
2088

2089
    This runs on master, primary and secondary nodes of the instance.
2090

2091
    """
2092
    env = _BuildInstanceHookEnvByObject(self.instance)
2093
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2094
          list(self.instance.secondary_nodes))
2095
    return env, nl, nl
2096

    
2097
  def CheckPrereq(self):
2098
    """Check prerequisites.
2099

2100
    This checks that the instance is in the cluster.
2101

2102
    """
2103
    instance = self.cfg.GetInstanceInfo(
2104
      self.cfg.ExpandInstanceName(self.op.instance_name))
2105
    if instance is None:
2106
      raise errors.OpPrereqError("Instance '%s' not known" %
2107
                                 self.op.instance_name)
2108
    self.instance = instance
2109

    
2110
  def Exec(self, feedback_fn):
2111
    """Shutdown the instance.
2112

2113
    """
2114
    instance = self.instance
2115
    node_current = instance.primary_node
2116
    if not rpc.call_instance_shutdown(node_current, instance):
2117
      logger.Error("could not shutdown instance")
2118

    
2119
    self.cfg.MarkInstanceDown(instance.name)
2120
    _ShutdownInstanceDisks(instance, self.cfg)
2121

    
2122

    
2123
class LUReinstallInstance(LogicalUnit):
2124
  """Reinstall an instance.
2125

2126
  """
2127
  HPATH = "instance-reinstall"
2128
  HTYPE = constants.HTYPE_INSTANCE
2129
  _OP_REQP = ["instance_name"]
2130

    
2131
  def BuildHooksEnv(self):
2132
    """Build hooks env.
2133

2134
    This runs on master, primary and secondary nodes of the instance.
2135

2136
    """
2137
    env = _BuildInstanceHookEnvByObject(self.instance)
2138
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2139
          list(self.instance.secondary_nodes))
2140
    return env, nl, nl
2141

    
2142
  def CheckPrereq(self):
2143
    """Check prerequisites.
2144

2145
    This checks that the instance is in the cluster and is not running.
2146

2147
    """
2148
    instance = self.cfg.GetInstanceInfo(
2149
      self.cfg.ExpandInstanceName(self.op.instance_name))
2150
    if instance is None:
2151
      raise errors.OpPrereqError("Instance '%s' not known" %
2152
                                 self.op.instance_name)
2153
    if instance.disk_template == constants.DT_DISKLESS:
2154
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2155
                                 self.op.instance_name)
2156
    if instance.status != "down":
2157
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2158
                                 self.op.instance_name)
2159
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2160
    if remote_info:
2161
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2162
                                 (self.op.instance_name,
2163
                                  instance.primary_node))
2164

    
2165
    self.op.os_type = getattr(self.op, "os_type", None)
2166
    if self.op.os_type is not None:
2167
      # OS verification
2168
      pnode = self.cfg.GetNodeInfo(
2169
        self.cfg.ExpandNodeName(instance.primary_node))
2170
      if pnode is None:
2171
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2172
                                   self.op.pnode)
2173
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2174
      if not os_obj:
2175
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2176
                                   " primary node"  % self.op.os_type)
2177

    
2178
    self.instance = instance
2179

    
2180
  def Exec(self, feedback_fn):
2181
    """Reinstall the instance.
2182

2183
    """
2184
    inst = self.instance
2185

    
2186
    if self.op.os_type is not None:
2187
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2188
      inst.os = self.op.os_type
2189
      self.cfg.AddInstance(inst)
2190

    
2191
    _StartInstanceDisks(self.cfg, inst, None)
2192
    try:
2193
      feedback_fn("Running the instance OS create scripts...")
2194
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2195
        raise errors.OpExecError("Could not install OS for instance %s "
2196
                                 "on node %s" %
2197
                                 (inst.name, inst.primary_node))
2198
    finally:
2199
      _ShutdownInstanceDisks(inst, self.cfg)
2200

    
2201

    
2202
class LURenameInstance(LogicalUnit):
2203
  """Rename an instance.
2204

2205
  """
2206
  HPATH = "instance-rename"
2207
  HTYPE = constants.HTYPE_INSTANCE
2208
  _OP_REQP = ["instance_name", "new_name"]
2209

    
2210
  def BuildHooksEnv(self):
2211
    """Build hooks env.
2212

2213
    This runs on master, primary and secondary nodes of the instance.
2214

2215
    """
2216
    env = _BuildInstanceHookEnvByObject(self.instance)
2217
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2218
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2219
          list(self.instance.secondary_nodes))
2220
    return env, nl, nl
2221

    
2222
  def CheckPrereq(self):
2223
    """Check prerequisites.
2224

2225
    This checks that the instance is in the cluster and is not running.
2226

2227
    """
2228
    instance = self.cfg.GetInstanceInfo(
2229
      self.cfg.ExpandInstanceName(self.op.instance_name))
2230
    if instance is None:
2231
      raise errors.OpPrereqError("Instance '%s' not known" %
2232
                                 self.op.instance_name)
2233
    if instance.status != "down":
2234
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2235
                                 self.op.instance_name)
2236
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2237
    if remote_info:
2238
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2239
                                 (self.op.instance_name,
2240
                                  instance.primary_node))
2241
    self.instance = instance
2242

    
2243
    # new name verification
2244
    name_info = utils.HostInfo(self.op.new_name)
2245

    
2246
    self.op.new_name = new_name = name_info.name
2247
    if not getattr(self.op, "ignore_ip", False):
2248
      command = ["fping", "-q", name_info.ip]
2249
      result = utils.RunCmd(command)
2250
      if not result.failed:
2251
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2252
                                   (name_info.ip, new_name))
2253

    
2254

    
2255
  def Exec(self, feedback_fn):
2256
    """Reinstall the instance.
2257

2258
    """
2259
    inst = self.instance
2260
    old_name = inst.name
2261

    
2262
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2263

    
2264
    # re-read the instance from the configuration after rename
2265
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2266

    
2267
    _StartInstanceDisks(self.cfg, inst, None)
2268
    try:
2269
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2270
                                          "sda", "sdb"):
2271
        msg = ("Could run OS rename script for instance %s\n"
2272
               "on node %s\n"
2273
               "(but the instance has been renamed in Ganeti)" %
2274
               (inst.name, inst.primary_node))
2275
        logger.Error(msg)
2276
    finally:
2277
      _ShutdownInstanceDisks(inst, self.cfg)
2278

    
2279

    
2280
class LURemoveInstance(LogicalUnit):
2281
  """Remove an instance.
2282

2283
  """
2284
  HPATH = "instance-remove"
2285
  HTYPE = constants.HTYPE_INSTANCE
2286
  _OP_REQP = ["instance_name"]
2287

    
2288
  def BuildHooksEnv(self):
2289
    """Build hooks env.
2290

2291
    This runs on master, primary and secondary nodes of the instance.
2292

2293
    """
2294
    env = _BuildInstanceHookEnvByObject(self.instance)
2295
    nl = [self.sstore.GetMasterNode()]
2296
    return env, nl, nl
2297

    
2298
  def CheckPrereq(self):
2299
    """Check prerequisites.
2300

2301
    This checks that the instance is in the cluster.
2302

2303
    """
2304
    instance = self.cfg.GetInstanceInfo(
2305
      self.cfg.ExpandInstanceName(self.op.instance_name))
2306
    if instance is None:
2307
      raise errors.OpPrereqError("Instance '%s' not known" %
2308
                                 self.op.instance_name)
2309
    self.instance = instance
2310

    
2311
  def Exec(self, feedback_fn):
2312
    """Remove the instance.
2313

2314
    """
2315
    instance = self.instance
2316
    logger.Info("shutting down instance %s on node %s" %
2317
                (instance.name, instance.primary_node))
2318

    
2319
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2320
      if self.op.ignore_failures:
2321
        feedback_fn("Warning: can't shutdown instance")
2322
      else:
2323
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2324
                                 (instance.name, instance.primary_node))
2325

    
2326
    logger.Info("removing block devices for instance %s" % instance.name)
2327

    
2328
    if not _RemoveDisks(instance, self.cfg):
2329
      if self.op.ignore_failures:
2330
        feedback_fn("Warning: can't remove instance's disks")
2331
      else:
2332
        raise errors.OpExecError("Can't remove instance's disks")
2333

    
2334
    logger.Info("removing instance %s out of cluster config" % instance.name)
2335

    
2336
    self.cfg.RemoveInstance(instance.name)
2337

    
2338

    
2339
class LUQueryInstances(NoHooksLU):
2340
  """Logical unit for querying instances.
2341

2342
  """
2343
  _OP_REQP = ["output_fields", "names"]
2344

    
2345
  def CheckPrereq(self):
2346
    """Check prerequisites.
2347

2348
    This checks that the fields required are valid output fields.
2349

2350
    """
2351
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2352
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2353
                               "admin_state", "admin_ram",
2354
                               "disk_template", "ip", "mac", "bridge",
2355
                               "sda_size", "sdb_size"],
2356
                       dynamic=self.dynamic_fields,
2357
                       selected=self.op.output_fields)
2358

    
2359
    self.wanted = _GetWantedInstances(self, self.op.names)
2360

    
2361
  def Exec(self, feedback_fn):
2362
    """Computes the list of nodes and their attributes.
2363

2364
    """
2365
    instance_names = self.wanted
2366
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2367
                     in instance_names]
2368

    
2369
    # begin data gathering
2370

    
2371
    nodes = frozenset([inst.primary_node for inst in instance_list])
2372

    
2373
    bad_nodes = []
2374
    if self.dynamic_fields.intersection(self.op.output_fields):
2375
      live_data = {}
2376
      node_data = rpc.call_all_instances_info(nodes)
2377
      for name in nodes:
2378
        result = node_data[name]
2379
        if result:
2380
          live_data.update(result)
2381
        elif result == False:
2382
          bad_nodes.append(name)
2383
        # else no instance is alive
2384
    else:
2385
      live_data = dict([(name, {}) for name in instance_names])
2386

    
2387
    # end data gathering
2388

    
2389
    output = []
2390
    for instance in instance_list:
2391
      iout = []
2392
      for field in self.op.output_fields:
2393
        if field == "name":
2394
          val = instance.name
2395
        elif field == "os":
2396
          val = instance.os
2397
        elif field == "pnode":
2398
          val = instance.primary_node
2399
        elif field == "snodes":
2400
          val = list(instance.secondary_nodes)
2401
        elif field == "admin_state":
2402
          val = (instance.status != "down")
2403
        elif field == "oper_state":
2404
          if instance.primary_node in bad_nodes:
2405
            val = None
2406
          else:
2407
            val = bool(live_data.get(instance.name))
2408
        elif field == "admin_ram":
2409
          val = instance.memory
2410
        elif field == "oper_ram":
2411
          if instance.primary_node in bad_nodes:
2412
            val = None
2413
          elif instance.name in live_data:
2414
            val = live_data[instance.name].get("memory", "?")
2415
          else:
2416
            val = "-"
2417
        elif field == "disk_template":
2418
          val = instance.disk_template
2419
        elif field == "ip":
2420
          val = instance.nics[0].ip
2421
        elif field == "bridge":
2422
          val = instance.nics[0].bridge
2423
        elif field == "mac":
2424
          val = instance.nics[0].mac
2425
        elif field == "sda_size" or field == "sdb_size":
2426
          disk = instance.FindDisk(field[:3])
2427
          if disk is None:
2428
            val = None
2429
          else:
2430
            val = disk.size
2431
        else:
2432
          raise errors.ParameterError(field)
2433
        iout.append(val)
2434
      output.append(iout)
2435

    
2436
    return output
2437

    
2438

    
2439
class LUFailoverInstance(LogicalUnit):
2440
  """Failover an instance.
2441

2442
  """
2443
  HPATH = "instance-failover"
2444
  HTYPE = constants.HTYPE_INSTANCE
2445
  _OP_REQP = ["instance_name", "ignore_consistency"]
2446

    
2447
  def BuildHooksEnv(self):
2448
    """Build hooks env.
2449

2450
    This runs on master, primary and secondary nodes of the instance.
2451

2452
    """
2453
    env = {
2454
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2455
      }
2456
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2457
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2458
    return env, nl, nl
2459

    
2460
  def CheckPrereq(self):
2461
    """Check prerequisites.
2462

2463
    This checks that the instance is in the cluster.
2464

2465
    """
2466
    instance = self.cfg.GetInstanceInfo(
2467
      self.cfg.ExpandInstanceName(self.op.instance_name))
2468
    if instance is None:
2469
      raise errors.OpPrereqError("Instance '%s' not known" %
2470
                                 self.op.instance_name)
2471

    
2472
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2473
      raise errors.OpPrereqError("Instance's disk layout is not"
2474
                                 " network mirrored, cannot failover.")
2475

    
2476
    secondary_nodes = instance.secondary_nodes
2477
    if not secondary_nodes:
2478
      raise errors.ProgrammerError("no secondary node but using "
2479
                                   "DT_REMOTE_RAID1 template")
2480

    
2481
    # check memory requirements on the secondary node
2482
    target_node = secondary_nodes[0]
2483
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2484
    info = nodeinfo.get(target_node, None)
2485
    if not info:
2486
      raise errors.OpPrereqError("Cannot get current information"
2487
                                 " from node '%s'" % nodeinfo)
2488
    if instance.memory > info['memory_free']:
2489
      raise errors.OpPrereqError("Not enough memory on target node %s."
2490
                                 " %d MB available, %d MB required" %
2491
                                 (target_node, info['memory_free'],
2492
                                  instance.memory))
2493

    
2494
    # check bridge existance
2495
    brlist = [nic.bridge for nic in instance.nics]
2496
    if not rpc.call_bridges_exist(target_node, brlist):
2497
      raise errors.OpPrereqError("One or more target bridges %s does not"
2498
                                 " exist on destination node '%s'" %
2499
                                 (brlist, target_node))
2500

    
2501
    self.instance = instance
2502

    
2503
  def Exec(self, feedback_fn):
2504
    """Failover an instance.
2505

2506
    The failover is done by shutting it down on its present node and
2507
    starting it on the secondary.
2508

2509
    """
2510
    instance = self.instance
2511

    
2512
    source_node = instance.primary_node
2513
    target_node = instance.secondary_nodes[0]
2514

    
2515
    feedback_fn("* checking disk consistency between source and target")
2516
    for dev in instance.disks:
2517
      # for remote_raid1, these are md over drbd
2518
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2519
        if not self.op.ignore_consistency:
2520
          raise errors.OpExecError("Disk %s is degraded on target node,"
2521
                                   " aborting failover." % dev.iv_name)
2522

    
2523
    feedback_fn("* checking target node resource availability")
2524
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2525

    
2526
    if not nodeinfo:
2527
      raise errors.OpExecError("Could not contact target node %s." %
2528
                               target_node)
2529

    
2530
    free_memory = int(nodeinfo[target_node]['memory_free'])
2531
    memory = instance.memory
2532
    if memory > free_memory:
2533
      raise errors.OpExecError("Not enough memory to create instance %s on"
2534
                               " node %s. needed %s MiB, available %s MiB" %
2535
                               (instance.name, target_node, memory,
2536
                                free_memory))
2537

    
2538
    feedback_fn("* shutting down instance on source node")
2539
    logger.Info("Shutting down instance %s on node %s" %
2540
                (instance.name, source_node))
2541

    
2542
    if not rpc.call_instance_shutdown(source_node, instance):
2543
      if self.op.ignore_consistency:
2544
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2545
                     " anyway. Please make sure node %s is down"  %
2546
                     (instance.name, source_node, source_node))
2547
      else:
2548
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2549
                                 (instance.name, source_node))
2550

    
2551
    feedback_fn("* deactivating the instance's disks on source node")
2552
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2553
      raise errors.OpExecError("Can't shut down the instance's disks.")
2554

    
2555
    instance.primary_node = target_node
2556
    # distribute new instance config to the other nodes
2557
    self.cfg.AddInstance(instance)
2558

    
2559
    feedback_fn("* activating the instance's disks on target node")
2560
    logger.Info("Starting instance %s on node %s" %
2561
                (instance.name, target_node))
2562

    
2563
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2564
                                             ignore_secondaries=True)
2565
    if not disks_ok:
2566
      _ShutdownInstanceDisks(instance, self.cfg)
2567
      raise errors.OpExecError("Can't activate the instance's disks")
2568

    
2569
    feedback_fn("* starting the instance on the target node")
2570
    if not rpc.call_instance_start(target_node, instance, None):
2571
      _ShutdownInstanceDisks(instance, self.cfg)
2572
      raise errors.OpExecError("Could not start instance %s on node %s." %
2573
                               (instance.name, target_node))
2574

    
2575

    
2576
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2577
  """Create a tree of block devices on the primary node.
2578

2579
  This always creates all devices.
2580

2581
  """
2582
  if device.children:
2583
    for child in device.children:
2584
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2585
        return False
2586

    
2587
  cfg.SetDiskID(device, node)
2588
  new_id = rpc.call_blockdev_create(node, device, device.size,
2589
                                    instance.name, True, info)
2590
  if not new_id:
2591
    return False
2592
  if device.physical_id is None:
2593
    device.physical_id = new_id
2594
  return True
2595

    
2596

    
2597
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2598
  """Create a tree of block devices on a secondary node.
2599

2600
  If this device type has to be created on secondaries, create it and
2601
  all its children.
2602

2603
  If not, just recurse to children keeping the same 'force' value.
2604

2605
  """
2606
  if device.CreateOnSecondary():
2607
    force = True
2608
  if device.children:
2609
    for child in device.children:
2610
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2611
                                        child, force, info):
2612
        return False
2613

    
2614
  if not force:
2615
    return True
2616
  cfg.SetDiskID(device, node)
2617
  new_id = rpc.call_blockdev_create(node, device, device.size,
2618
                                    instance.name, False, info)
2619
  if not new_id:
2620
    return False
2621
  if device.physical_id is None:
2622
    device.physical_id = new_id
2623
  return True
2624

    
2625

    
2626
def _GenerateUniqueNames(cfg, exts):
2627
  """Generate a suitable LV name.
2628

2629
  This will generate a logical volume name for the given instance.
2630

2631
  """
2632
  results = []
2633
  for val in exts:
2634
    new_id = cfg.GenerateUniqueID()
2635
    results.append("%s%s" % (new_id, val))
2636
  return results
2637

    
2638

    
2639
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2640
  """Generate a drbd device complete with its children.
2641

2642
  """
2643
  port = cfg.AllocatePort()
2644
  vgname = cfg.GetVGName()
2645
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2646
                          logical_id=(vgname, names[0]))
2647
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2648
                          logical_id=(vgname, names[1]))
2649
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2650
                          logical_id = (primary, secondary, port),
2651
                          children = [dev_data, dev_meta])
2652
  return drbd_dev
2653

    
2654

    
2655
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2656
  """Generate a drbd8 device complete with its children.
2657

2658
  """
2659
  port = cfg.AllocatePort()
2660
  vgname = cfg.GetVGName()
2661
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2662
                          logical_id=(vgname, names[0]))
2663
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2664
                          logical_id=(vgname, names[1]))
2665
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2666
                          logical_id = (primary, secondary, port),
2667
                          children = [dev_data, dev_meta],
2668
                          iv_name=iv_name)
2669
  return drbd_dev
2670

    
2671
def _GenerateDiskTemplate(cfg, template_name,
2672
                          instance_name, primary_node,
2673
                          secondary_nodes, disk_sz, swap_sz):
2674
  """Generate the entire disk layout for a given template type.
2675

2676
  """
2677
  #TODO: compute space requirements
2678

    
2679
  vgname = cfg.GetVGName()
2680
  if template_name == "diskless":
2681
    disks = []
2682
  elif template_name == "plain":
2683
    if len(secondary_nodes) != 0:
2684
      raise errors.ProgrammerError("Wrong template configuration")
2685

    
2686
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2687
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2688
                           logical_id=(vgname, names[0]),
2689
                           iv_name = "sda")
2690
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2691
                           logical_id=(vgname, names[1]),
2692
                           iv_name = "sdb")
2693
    disks = [sda_dev, sdb_dev]
2694
  elif template_name == "local_raid1":
2695
    if len(secondary_nodes) != 0:
2696
      raise errors.ProgrammerError("Wrong template configuration")
2697

    
2698

    
2699
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2700
                                       ".sdb_m1", ".sdb_m2"])
2701
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2702
                              logical_id=(vgname, names[0]))
2703
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2704
                              logical_id=(vgname, names[1]))
2705
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2706
                              size=disk_sz,
2707
                              children = [sda_dev_m1, sda_dev_m2])
2708
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2709
                              logical_id=(vgname, names[2]))
2710
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2711
                              logical_id=(vgname, names[3]))
2712
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2713
                              size=swap_sz,
2714
                              children = [sdb_dev_m1, sdb_dev_m2])
2715
    disks = [md_sda_dev, md_sdb_dev]
2716
  elif template_name == constants.DT_REMOTE_RAID1:
2717
    if len(secondary_nodes) != 1:
2718
      raise errors.ProgrammerError("Wrong template configuration")
2719
    remote_node = secondary_nodes[0]
2720
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2721
                                       ".sdb_data", ".sdb_meta"])
2722
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2723
                                         disk_sz, names[0:2])
2724
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2725
                              children = [drbd_sda_dev], size=disk_sz)
2726
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2727
                                         swap_sz, names[2:4])
2728
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2729
                              children = [drbd_sdb_dev], size=swap_sz)
2730
    disks = [md_sda_dev, md_sdb_dev]
2731
  elif template_name == constants.DT_DRBD8:
2732
    if len(secondary_nodes) != 1:
2733
      raise errors.ProgrammerError("Wrong template configuration")
2734
    remote_node = secondary_nodes[0]
2735
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2736
                                       ".sdb_data", ".sdb_meta"])
2737
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2738
                                         disk_sz, names[0:2], "sda")
2739
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2740
                                         swap_sz, names[2:4], "sdb")
2741
    disks = [drbd_sda_dev, drbd_sdb_dev]
2742
  else:
2743
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2744
  return disks
2745

    
2746

    
2747
def _GetInstanceInfoText(instance):
2748
  """Compute that text that should be added to the disk's metadata.
2749

2750
  """
2751
  return "originstname+%s" % instance.name
2752

    
2753

    
2754
def _CreateDisks(cfg, instance):
2755
  """Create all disks for an instance.
2756

2757
  This abstracts away some work from AddInstance.
2758

2759
  Args:
2760
    instance: the instance object
2761

2762
  Returns:
2763
    True or False showing the success of the creation process
2764

2765
  """
2766
  info = _GetInstanceInfoText(instance)
2767

    
2768
  for device in instance.disks:
2769
    logger.Info("creating volume %s for instance %s" %
2770
              (device.iv_name, instance.name))
2771
    #HARDCODE
2772
    for secondary_node in instance.secondary_nodes:
2773
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2774
                                        device, False, info):
2775
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2776
                     (device.iv_name, device, secondary_node))
2777
        return False
2778
    #HARDCODE
2779
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2780
                                    instance, device, info):
2781
      logger.Error("failed to create volume %s on primary!" %
2782
                   device.iv_name)
2783
      return False
2784
  return True
2785

    
2786

    
2787
def _RemoveDisks(instance, cfg):
2788
  """Remove all disks for an instance.
2789

2790
  This abstracts away some work from `AddInstance()` and
2791
  `RemoveInstance()`. Note that in case some of the devices couldn't
2792
  be removed, the removal will continue with the other ones (compare
2793
  with `_CreateDisks()`).
2794

2795
  Args:
2796
    instance: the instance object
2797

2798
  Returns:
2799
    True or False showing the success of the removal proces
2800

2801
  """
2802
  logger.Info("removing block devices for instance %s" % instance.name)
2803

    
2804
  result = True
2805
  for device in instance.disks:
2806
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2807
      cfg.SetDiskID(disk, node)
2808
      if not rpc.call_blockdev_remove(node, disk):
2809
        logger.Error("could not remove block device %s on node %s,"
2810
                     " continuing anyway" %
2811
                     (device.iv_name, node))
2812
        result = False
2813
  return result
2814

    
2815

    
2816
class LUCreateInstance(LogicalUnit):
2817
  """Create an instance.
2818

2819
  """
2820
  HPATH = "instance-add"
2821
  HTYPE = constants.HTYPE_INSTANCE
2822
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2823
              "disk_template", "swap_size", "mode", "start", "vcpus",
2824
              "wait_for_sync", "ip_check"]
2825

    
2826
  def BuildHooksEnv(self):
2827
    """Build hooks env.
2828

2829
    This runs on master, primary and secondary nodes of the instance.
2830

2831
    """
2832
    env = {
2833
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2834
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2835
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2836
      "INSTANCE_ADD_MODE": self.op.mode,
2837
      }
2838
    if self.op.mode == constants.INSTANCE_IMPORT:
2839
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2840
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2841
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2842

    
2843
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2844
      primary_node=self.op.pnode,
2845
      secondary_nodes=self.secondaries,
2846
      status=self.instance_status,
2847
      os_type=self.op.os_type,
2848
      memory=self.op.mem_size,
2849
      vcpus=self.op.vcpus,
2850
      nics=[(self.inst_ip, self.op.bridge)],
2851
    ))
2852

    
2853
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2854
          self.secondaries)
2855
    return env, nl, nl
2856

    
2857

    
2858
  def CheckPrereq(self):
2859
    """Check prerequisites.
2860

2861
    """
2862
    if self.op.mode not in (constants.INSTANCE_CREATE,
2863
                            constants.INSTANCE_IMPORT):
2864
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2865
                                 self.op.mode)
2866

    
2867
    if self.op.mode == constants.INSTANCE_IMPORT:
2868
      src_node = getattr(self.op, "src_node", None)
2869
      src_path = getattr(self.op, "src_path", None)
2870
      if src_node is None or src_path is None:
2871
        raise errors.OpPrereqError("Importing an instance requires source"
2872
                                   " node and path options")
2873
      src_node_full = self.cfg.ExpandNodeName(src_node)
2874
      if src_node_full is None:
2875
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2876
      self.op.src_node = src_node = src_node_full
2877

    
2878
      if not os.path.isabs(src_path):
2879
        raise errors.OpPrereqError("The source path must be absolute")
2880

    
2881
      export_info = rpc.call_export_info(src_node, src_path)
2882

    
2883
      if not export_info:
2884
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2885

    
2886
      if not export_info.has_section(constants.INISECT_EXP):
2887
        raise errors.ProgrammerError("Corrupted export config")
2888

    
2889
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2890
      if (int(ei_version) != constants.EXPORT_VERSION):
2891
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2892
                                   (ei_version, constants.EXPORT_VERSION))
2893

    
2894
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2895
        raise errors.OpPrereqError("Can't import instance with more than"
2896
                                   " one data disk")
2897

    
2898
      # FIXME: are the old os-es, disk sizes, etc. useful?
2899
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2900
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2901
                                                         'disk0_dump'))
2902
      self.src_image = diskimage
2903
    else: # INSTANCE_CREATE
2904
      if getattr(self.op, "os_type", None) is None:
2905
        raise errors.OpPrereqError("No guest OS specified")
2906

    
2907
    # check primary node
2908
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2909
    if pnode is None:
2910
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2911
                                 self.op.pnode)
2912
    self.op.pnode = pnode.name
2913
    self.pnode = pnode
2914
    self.secondaries = []
2915
    # disk template and mirror node verification
2916
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2917
      raise errors.OpPrereqError("Invalid disk template name")
2918

    
2919
    if self.op.disk_template in constants.DTS_NET_MIRROR:
2920
      if getattr(self.op, "snode", None) is None:
2921
        raise errors.OpPrereqError("The networked disk templates need"
2922
                                   " a mirror node")
2923

    
2924
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2925
      if snode_name is None:
2926
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2927
                                   self.op.snode)
2928
      elif snode_name == pnode.name:
2929
        raise errors.OpPrereqError("The secondary node cannot be"
2930
                                   " the primary node.")
2931
      self.secondaries.append(snode_name)
2932

    
2933
    # Check lv size requirements
2934
    nodenames = [pnode.name] + self.secondaries
2935
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2936

    
2937
    # Required free disk space as a function of disk and swap space
2938
    req_size_dict = {
2939
      constants.DT_DISKLESS: 0,
2940
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2941
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2942
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2943
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2944
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2945
    }
2946

    
2947
    if self.op.disk_template not in req_size_dict:
2948
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2949
                                   " is unknown" %  self.op.disk_template)
2950

    
2951
    req_size = req_size_dict[self.op.disk_template]
2952

    
2953
    for node in nodenames:
2954
      info = nodeinfo.get(node, None)
2955
      if not info:
2956
        raise errors.OpPrereqError("Cannot get current information"
2957
                                   " from node '%s'" % nodeinfo)
2958
      if req_size > info['vg_free']:
2959
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2960
                                   " %d MB available, %d MB required" %
2961
                                   (node, info['vg_free'], req_size))
2962

    
2963
    # os verification
2964
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2965
    if not os_obj:
2966
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2967
                                 " primary node"  % self.op.os_type)
2968

    
2969
    # instance verification
2970
    hostname1 = utils.HostInfo(self.op.instance_name)
2971

    
2972
    self.op.instance_name = instance_name = hostname1.name
2973
    instance_list = self.cfg.GetInstanceList()
2974
    if instance_name in instance_list:
2975
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2976
                                 instance_name)
2977

    
2978
    ip = getattr(self.op, "ip", None)
2979
    if ip is None or ip.lower() == "none":
2980
      inst_ip = None
2981
    elif ip.lower() == "auto":
2982
      inst_ip = hostname1.ip
2983
    else:
2984
      if not utils.IsValidIP(ip):
2985
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
2986
                                   " like a valid IP" % ip)
2987
      inst_ip = ip
2988
    self.inst_ip = inst_ip
2989

    
2990
    if self.op.start and not self.op.ip_check:
2991
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2992
                                 " adding an instance in start mode")
2993

    
2994
    if self.op.ip_check:
2995
      if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2996
                       constants.DEFAULT_NODED_PORT):
2997
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2998
                                   (hostname1.ip, instance_name))
2999

    
3000
    # bridge verification
3001
    bridge = getattr(self.op, "bridge", None)
3002
    if bridge is None:
3003
      self.op.bridge = self.cfg.GetDefBridge()
3004
    else:
3005
      self.op.bridge = bridge
3006

    
3007
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3008
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3009
                                 " destination node '%s'" %
3010
                                 (self.op.bridge, pnode.name))
3011

    
3012
    if self.op.start:
3013
      self.instance_status = 'up'
3014
    else:
3015
      self.instance_status = 'down'
3016

    
3017
  def Exec(self, feedback_fn):
3018
    """Create and add the instance to the cluster.
3019

3020
    """
3021
    instance = self.op.instance_name
3022
    pnode_name = self.pnode.name
3023

    
3024
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
3025
    if self.inst_ip is not None:
3026
      nic.ip = self.inst_ip
3027

    
3028
    disks = _GenerateDiskTemplate(self.cfg,
3029
                                  self.op.disk_template,
3030
                                  instance, pnode_name,
3031
                                  self.secondaries, self.op.disk_size,
3032
                                  self.op.swap_size)
3033

    
3034
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3035
                            primary_node=pnode_name,
3036
                            memory=self.op.mem_size,
3037
                            vcpus=self.op.vcpus,
3038
                            nics=[nic], disks=disks,
3039
                            disk_template=self.op.disk_template,
3040
                            status=self.instance_status,
3041
                            )
3042

    
3043
    feedback_fn("* creating instance disks...")
3044
    if not _CreateDisks(self.cfg, iobj):
3045
      _RemoveDisks(iobj, self.cfg)
3046
      raise errors.OpExecError("Device creation failed, reverting...")
3047

    
3048
    feedback_fn("adding instance %s to cluster config" % instance)
3049

    
3050
    self.cfg.AddInstance(iobj)
3051

    
3052
    if self.op.wait_for_sync:
3053
      disk_abort = not _WaitForSync(self.cfg, iobj)
3054
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3055
      # make sure the disks are not degraded (still sync-ing is ok)
3056
      time.sleep(15)
3057
      feedback_fn("* checking mirrors status")
3058
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
3059
    else:
3060
      disk_abort = False
3061

    
3062
    if disk_abort:
3063
      _RemoveDisks(iobj, self.cfg)
3064
      self.cfg.RemoveInstance(iobj.name)
3065
      raise errors.OpExecError("There are some degraded disks for"
3066
                               " this instance")
3067

    
3068
    feedback_fn("creating os for instance %s on node %s" %
3069
                (instance, pnode_name))
3070

    
3071
    if iobj.disk_template != constants.DT_DISKLESS:
3072
      if self.op.mode == constants.INSTANCE_CREATE:
3073
        feedback_fn("* running the instance OS create scripts...")
3074
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3075
          raise errors.OpExecError("could not add os for instance %s"
3076
                                   " on node %s" %
3077
                                   (instance, pnode_name))
3078

    
3079
      elif self.op.mode == constants.INSTANCE_IMPORT:
3080
        feedback_fn("* running the instance OS import scripts...")
3081
        src_node = self.op.src_node
3082
        src_image = self.src_image
3083
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3084
                                                src_node, src_image):
3085
          raise errors.OpExecError("Could not import os for instance"
3086
                                   " %s on node %s" %
3087
                                   (instance, pnode_name))
3088
      else:
3089
        # also checked in the prereq part
3090
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3091
                                     % self.op.mode)
3092

    
3093
    if self.op.start:
3094
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3095
      feedback_fn("* starting instance...")
3096
      if not rpc.call_instance_start(pnode_name, iobj, None):
3097
        raise errors.OpExecError("Could not start instance")
3098

    
3099

    
3100
class LUConnectConsole(NoHooksLU):
3101
  """Connect to an instance's console.
3102

3103
  This is somewhat special in that it returns the command line that
3104
  you need to run on the master node in order to connect to the
3105
  console.
3106

3107
  """
3108
  _OP_REQP = ["instance_name"]
3109

    
3110
  def CheckPrereq(self):
3111
    """Check prerequisites.
3112

3113
    This checks that the instance is in the cluster.
3114

3115
    """
3116
    instance = self.cfg.GetInstanceInfo(
3117
      self.cfg.ExpandInstanceName(self.op.instance_name))
3118
    if instance is None:
3119
      raise errors.OpPrereqError("Instance '%s' not known" %
3120
                                 self.op.instance_name)
3121
    self.instance = instance
3122

    
3123
  def Exec(self, feedback_fn):
3124
    """Connect to the console of an instance
3125

3126
    """
3127
    instance = self.instance
3128
    node = instance.primary_node
3129

    
3130
    node_insts = rpc.call_instance_list([node])[node]
3131
    if node_insts is False:
3132
      raise errors.OpExecError("Can't connect to node %s." % node)
3133

    
3134
    if instance.name not in node_insts:
3135
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3136

    
3137
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3138

    
3139
    hyper = hypervisor.GetHypervisor()
3140
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
3141
    # build ssh cmdline
3142
    argv = ["ssh", "-q", "-t"]
3143
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
3144
    argv.extend(ssh.BATCH_MODE_OPTS)
3145
    argv.append(node)
3146
    argv.append(console_cmd)
3147
    return "ssh", argv
3148

    
3149

    
3150
class LUAddMDDRBDComponent(LogicalUnit):
3151
  """Adda new mirror member to an instance's disk.
3152

3153
  """
3154
  HPATH = "mirror-add"
3155
  HTYPE = constants.HTYPE_INSTANCE
3156
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3157

    
3158
  def BuildHooksEnv(self):
3159
    """Build hooks env.
3160

3161
    This runs on the master, the primary and all the secondaries.
3162

3163
    """
3164
    env = {
3165
      "NEW_SECONDARY": self.op.remote_node,
3166
      "DISK_NAME": self.op.disk_name,
3167
      }
3168
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3169
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3170
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3171
    return env, nl, nl
3172

    
3173
  def CheckPrereq(self):
3174
    """Check prerequisites.
3175

3176
    This checks that the instance is in the cluster.
3177

3178
    """
3179
    instance = self.cfg.GetInstanceInfo(
3180
      self.cfg.ExpandInstanceName(self.op.instance_name))
3181
    if instance is None:
3182
      raise errors.OpPrereqError("Instance '%s' not known" %
3183
                                 self.op.instance_name)
3184
    self.instance = instance
3185

    
3186
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3187
    if remote_node is None:
3188
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3189
    self.remote_node = remote_node
3190

    
3191
    if remote_node == instance.primary_node:
3192
      raise errors.OpPrereqError("The specified node is the primary node of"
3193
                                 " the instance.")
3194

    
3195
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3196
      raise errors.OpPrereqError("Instance's disk layout is not"
3197
                                 " remote_raid1.")
3198
    for disk in instance.disks:
3199
      if disk.iv_name == self.op.disk_name:
3200
        break
3201
    else:
3202
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3203
                                 " instance." % self.op.disk_name)
3204
    if len(disk.children) > 1:
3205
      raise errors.OpPrereqError("The device already has two slave"
3206
                                 " devices.\n"
3207
                                 "This would create a 3-disk raid1"
3208
                                 " which we don't allow.")
3209
    self.disk = disk
3210

    
3211
  def Exec(self, feedback_fn):
3212
    """Add the mirror component
3213

3214
    """
3215
    disk = self.disk
3216
    instance = self.instance
3217

    
3218
    remote_node = self.remote_node
3219
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3220
    names = _GenerateUniqueNames(self.cfg, lv_names)
3221
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3222
                                     remote_node, disk.size, names)
3223

    
3224
    logger.Info("adding new mirror component on secondary")
3225
    #HARDCODE
3226
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3227
                                      new_drbd, False,
3228
                                      _GetInstanceInfoText(instance)):
3229
      raise errors.OpExecError("Failed to create new component on secondary"
3230
                               " node %s" % remote_node)
3231

    
3232
    logger.Info("adding new mirror component on primary")
3233
    #HARDCODE
3234
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3235
                                    instance, new_drbd,
3236
                                    _GetInstanceInfoText(instance)):
3237
      # remove secondary dev
3238
      self.cfg.SetDiskID(new_drbd, remote_node)
3239
      rpc.call_blockdev_remove(remote_node, new_drbd)
3240
      raise errors.OpExecError("Failed to create volume on primary")
3241

    
3242
    # the device exists now
3243
    # call the primary node to add the mirror to md
3244
    logger.Info("adding new mirror component to md")
3245
    if not rpc.call_blockdev_addchildren(instance.primary_node,
3246
                                         disk, [new_drbd]):
3247
      logger.Error("Can't add mirror compoment to md!")
3248
      self.cfg.SetDiskID(new_drbd, remote_node)
3249
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3250
        logger.Error("Can't rollback on secondary")
3251
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3252
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3253
        logger.Error("Can't rollback on primary")
3254
      raise errors.OpExecError("Can't add mirror component to md array")
3255

    
3256
    disk.children.append(new_drbd)
3257

    
3258
    self.cfg.AddInstance(instance)
3259

    
3260
    _WaitForSync(self.cfg, instance)
3261

    
3262
    return 0
3263

    
3264

    
3265
class LURemoveMDDRBDComponent(LogicalUnit):
3266
  """Remove a component from a remote_raid1 disk.
3267

3268
  """
3269
  HPATH = "mirror-remove"
3270
  HTYPE = constants.HTYPE_INSTANCE
3271
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3272

    
3273
  def BuildHooksEnv(self):
3274
    """Build hooks env.
3275

3276
    This runs on the master, the primary and all the secondaries.
3277

3278
    """
3279
    env = {
3280
      "DISK_NAME": self.op.disk_name,
3281
      "DISK_ID": self.op.disk_id,
3282
      "OLD_SECONDARY": self.old_secondary,
3283
      }
3284
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3285
    nl = [self.sstore.GetMasterNode(),
3286
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3287
    return env, nl, nl
3288

    
3289
  def CheckPrereq(self):
3290
    """Check prerequisites.
3291

3292
    This checks that the instance is in the cluster.
3293

3294
    """
3295
    instance = self.cfg.GetInstanceInfo(
3296
      self.cfg.ExpandInstanceName(self.op.instance_name))
3297
    if instance is None:
3298
      raise errors.OpPrereqError("Instance '%s' not known" %
3299
                                 self.op.instance_name)
3300
    self.instance = instance
3301

    
3302
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3303
      raise errors.OpPrereqError("Instance's disk layout is not"
3304
                                 " remote_raid1.")
3305
    for disk in instance.disks:
3306
      if disk.iv_name == self.op.disk_name:
3307
        break
3308
    else:
3309
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3310
                                 " instance." % self.op.disk_name)
3311
    for child in disk.children:
3312
      if (child.dev_type == constants.LD_DRBD7 and
3313
          child.logical_id[2] == self.op.disk_id):
3314
        break
3315
    else:
3316
      raise errors.OpPrereqError("Can't find the device with this port.")
3317

    
3318
    if len(disk.children) < 2:
3319
      raise errors.OpPrereqError("Cannot remove the last component from"
3320
                                 " a mirror.")
3321
    self.disk = disk
3322
    self.child = child
3323
    if self.child.logical_id[0] == instance.primary_node:
3324
      oid = 1
3325
    else:
3326
      oid = 0
3327
    self.old_secondary = self.child.logical_id[oid]
3328

    
3329
  def Exec(self, feedback_fn):
3330
    """Remove the mirror component
3331

3332
    """
3333
    instance = self.instance
3334
    disk = self.disk
3335
    child = self.child
3336
    logger.Info("remove mirror component")
3337
    self.cfg.SetDiskID(disk, instance.primary_node)
3338
    if not rpc.call_blockdev_removechildren(instance.primary_node,
3339
                                            disk, [child]):
3340
      raise errors.OpExecError("Can't remove child from mirror.")
3341

    
3342
    for node in child.logical_id[:2]:
3343
      self.cfg.SetDiskID(child, node)
3344
      if not rpc.call_blockdev_remove(node, child):
3345
        logger.Error("Warning: failed to remove device from node %s,"
3346
                     " continuing operation." % node)
3347

    
3348
    disk.children.remove(child)
3349
    self.cfg.AddInstance(instance)
3350

    
3351

    
3352
class LUReplaceDisks(LogicalUnit):
3353
  """Replace the disks of an instance.
3354

3355
  """
3356
  HPATH = "mirrors-replace"
3357
  HTYPE = constants.HTYPE_INSTANCE
3358
  _OP_REQP = ["instance_name", "mode", "disks"]
3359

    
3360
  def BuildHooksEnv(self):
3361
    """Build hooks env.
3362

3363
    This runs on the master, the primary and all the secondaries.
3364

3365
    """
3366
    env = {
3367
      "MODE": self.op.mode,
3368
      "NEW_SECONDARY": self.op.remote_node,
3369
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3370
      }
3371
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3372
    nl = [
3373
      self.sstore.GetMasterNode(),
3374
      self.instance.primary_node,
3375
      ]
3376
    if self.op.remote_node is not None:
3377
      nl.append(self.op.remote_node)
3378
    return env, nl, nl
3379

    
3380
  def CheckPrereq(self):
3381
    """Check prerequisites.
3382

3383
    This checks that the instance is in the cluster.
3384

3385
    """
3386
    instance = self.cfg.GetInstanceInfo(
3387
      self.cfg.ExpandInstanceName(self.op.instance_name))
3388
    if instance is None:
3389
      raise errors.OpPrereqError("Instance '%s' not known" %
3390
                                 self.op.instance_name)
3391
    self.instance = instance
3392

    
3393
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3394
      raise errors.OpPrereqError("Instance's disk layout is not"
3395
                                 " network mirrored.")
3396

    
3397
    if len(instance.secondary_nodes) != 1:
3398
      raise errors.OpPrereqError("The instance has a strange layout,"
3399
                                 " expected one secondary but found %d" %
3400
                                 len(instance.secondary_nodes))
3401

    
3402
    self.sec_node = instance.secondary_nodes[0]
3403

    
3404
    remote_node = getattr(self.op, "remote_node", None)
3405
    if remote_node is not None:
3406
      remote_node = self.cfg.ExpandNodeName(remote_node)
3407
      if remote_node is None:
3408
        raise errors.OpPrereqError("Node '%s' not known" %
3409
                                   self.op.remote_node)
3410
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3411
    else:
3412
      self.remote_node_info = None
3413
    if remote_node == instance.primary_node:
3414
      raise errors.OpPrereqError("The specified node is the primary node of"
3415
                                 " the instance.")
3416
    elif remote_node == self.sec_node:
3417
      if self.op.mode == constants.REPLACE_DISK_SEC:
3418
        # this is for DRBD8, where we can't execute the same mode of
3419
        # replacement as for drbd7 (no different port allocated)
3420
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3421
                                   " replacement")
3422
      # the user gave the current secondary, switch to
3423
      # 'no-replace-secondary' mode for drbd7
3424
      remote_node = None
3425
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3426
        self.op.mode != constants.REPLACE_DISK_ALL):
3427
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3428
                                 " disks replacement, not individual ones")
3429
    if instance.disk_template == constants.DT_DRBD8:
3430
      if self.op.mode == constants.REPLACE_DISK_ALL:
3431
        raise errors.OpPrereqError("Template 'drbd8' only allows primary or"
3432
                                   " secondary disk replacement, not"
3433
                                   " both at once")
3434
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3435
        if remote_node is not None:
3436
          raise errors.OpPrereqError("Template 'drbd8' does not allow changing"
3437
                                     " the secondary while doing a primary"
3438
                                     " node disk replacement")
3439
        self.tgt_node = instance.primary_node
3440
        self.oth_node = instance.secondary_nodes[0]
3441
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3442
        self.new_node = remote_node # this can be None, in which case
3443
                                    # we don't change the secondary
3444
        self.tgt_node = instance.secondary_nodes[0]
3445
        self.oth_node = instance.primary_node
3446
      else:
3447
        raise errors.ProgrammerError("Unhandled disk replace mode")
3448

    
3449
    for name in self.op.disks:
3450
      if instance.FindDisk(name) is None:
3451
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3452
                                   (name, instance.name))
3453
    self.op.remote_node = remote_node
3454

    
3455
  def _ExecRR1(self, feedback_fn):
3456
    """Replace the disks of an instance.
3457

3458
    """
3459
    instance = self.instance
3460
    iv_names = {}
3461
    # start of work
3462
    if self.op.remote_node is None:
3463
      remote_node = self.sec_node
3464
    else:
3465
      remote_node = self.op.remote_node
3466
    cfg = self.cfg
3467
    for dev in instance.disks:
3468
      size = dev.size
3469
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3470
      names = _GenerateUniqueNames(cfg, lv_names)
3471
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3472
                                       remote_node, size, names)
3473
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3474
      logger.Info("adding new mirror component on secondary for %s" %
3475
                  dev.iv_name)
3476
      #HARDCODE
3477
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3478
                                        new_drbd, False,
3479
                                        _GetInstanceInfoText(instance)):
3480
        raise errors.OpExecError("Failed to create new component on"
3481
                                 " secondary node %s\n"
3482
                                 "Full abort, cleanup manually!" %
3483
                                 remote_node)
3484

    
3485
      logger.Info("adding new mirror component on primary")
3486
      #HARDCODE
3487
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3488
                                      instance, new_drbd,
3489
                                      _GetInstanceInfoText(instance)):
3490
        # remove secondary dev
3491
        cfg.SetDiskID(new_drbd, remote_node)
3492
        rpc.call_blockdev_remove(remote_node, new_drbd)
3493
        raise errors.OpExecError("Failed to create volume on primary!\n"
3494
                                 "Full abort, cleanup manually!!")
3495

    
3496
      # the device exists now
3497
      # call the primary node to add the mirror to md
3498
      logger.Info("adding new mirror component to md")
3499
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3500
                                           [new_drbd]):
3501
        logger.Error("Can't add mirror compoment to md!")
3502
        cfg.SetDiskID(new_drbd, remote_node)
3503
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3504
          logger.Error("Can't rollback on secondary")
3505
        cfg.SetDiskID(new_drbd, instance.primary_node)
3506
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3507
          logger.Error("Can't rollback on primary")
3508
        raise errors.OpExecError("Full abort, cleanup manually!!")
3509

    
3510
      dev.children.append(new_drbd)
3511
      cfg.AddInstance(instance)
3512

    
3513
    # this can fail as the old devices are degraded and _WaitForSync
3514
    # does a combined result over all disks, so we don't check its
3515
    # return value
3516
    _WaitForSync(cfg, instance, unlock=True)
3517

    
3518
    # so check manually all the devices
3519
    for name in iv_names:
3520
      dev, child, new_drbd = iv_names[name]
3521
      cfg.SetDiskID(dev, instance.primary_node)
3522
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3523
      if is_degr:
3524
        raise errors.OpExecError("MD device %s is degraded!" % name)
3525
      cfg.SetDiskID(new_drbd, instance.primary_node)
3526
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3527
      if is_degr:
3528
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3529

    
3530
    for name in iv_names:
3531
      dev, child, new_drbd = iv_names[name]
3532
      logger.Info("remove mirror %s component" % name)
3533
      cfg.SetDiskID(dev, instance.primary_node)
3534
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3535
                                              dev, [child]):
3536
        logger.Error("Can't remove child from mirror, aborting"
3537
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3538
        continue
3539

    
3540
      for node in child.logical_id[:2]:
3541
        logger.Info("remove child device on %s" % node)
3542
        cfg.SetDiskID(child, node)
3543
        if not rpc.call_blockdev_remove(node, child):
3544
          logger.Error("Warning: failed to remove device from node %s,"
3545
                       " continuing operation." % node)
3546

    
3547
      dev.children.remove(child)
3548

    
3549
      cfg.AddInstance(instance)
3550

    
3551
  def _ExecD8DiskOnly(self, feedback_fn):
3552
    """Replace a disk on the primary or secondary for dbrd8.
3553

3554
    The algorithm for replace is quite complicated:
3555
      - for each disk to be replaced:
3556
        - create new LVs on the target node with unique names
3557
        - detach old LVs from the drbd device
3558
        - rename old LVs to name_replaced.<time_t>
3559
        - rename new LVs to old LVs
3560
        - attach the new LVs (with the old names now) to the drbd device
3561
      - wait for sync across all devices
3562
      - for each modified disk:
3563
        - remove old LVs (which have the name name_replaces.<time_t>)
3564

3565
    Failures are not very well handled.
3566

3567
    """
3568
    steps_total = 6
3569
    warning, info = (self.processor.LogWarning, self.processor.LogInfo)
3570
    instance = self.instance
3571
    iv_names = {}
3572
    vgname = self.cfg.GetVGName()
3573
    # start of work
3574
    cfg = self.cfg
3575
    tgt_node = self.tgt_node
3576
    oth_node = self.oth_node
3577

    
3578
    # Step: check device activation
3579
    self.processor.LogStep(1, steps_total, "check device existence")
3580
    info("checking volume groups")
3581
    my_vg = cfg.GetVGName()
3582
    results = rpc.call_vg_list([oth_node, tgt_node])
3583
    if not results:
3584
      raise errors.OpExecError("Can't list volume groups on the nodes")
3585
    for node in oth_node, tgt_node:
3586
      res = results.get(node, False)
3587
      if not res or my_vg not in res:
3588
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3589
                                 (my_vg, node))
3590
    for dev in instance.disks:
3591
      if not dev.iv_name in self.op.disks:
3592
        continue
3593
      for node in tgt_node, oth_node:
3594
        info("checking %s on %s" % (dev.iv_name, node))
3595
        cfg.SetDiskID(dev, node)
3596
        if not rpc.call_blockdev_find(node, dev):
3597
          raise errors.OpExecError("Can't find device %s on node %s" %
3598
                                   (dev.iv_name, node))
3599

    
3600
    # Step: check other node consistency
3601
    self.processor.LogStep(2, steps_total, "check peer consistency")
3602
    for dev in instance.disks:
3603
      if not dev.iv_name in self.op.disks:
3604
        continue
3605
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3606
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3607
                                   oth_node==instance.primary_node):
3608
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3609
                                 " to replace disks on this node (%s)" %
3610
                                 (oth_node, tgt_node))
3611

    
3612
    # Step: create new storage
3613
    self.processor.LogStep(3, steps_total, "allocate new storage")
3614
    for dev in instance.disks:
3615
      if not dev.iv_name in self.op.disks:
3616
        continue
3617
      size = dev.size
3618
      cfg.SetDiskID(dev, tgt_node)
3619
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3620
      names = _GenerateUniqueNames(cfg, lv_names)
3621
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3622
                             logical_id=(vgname, names[0]))
3623
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3624
                             logical_id=(vgname, names[1]))
3625
      new_lvs = [lv_data, lv_meta]
3626
      old_lvs = dev.children
3627
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3628
      info("creating new local storage on %s for %s" %
3629
           (tgt_node, dev.iv_name))
3630
      # since we *always* want to create this LV, we use the
3631
      # _Create...OnPrimary (which forces the creation), even if we
3632
      # are talking about the secondary node
3633
      for new_lv in new_lvs:
3634
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3635
                                        _GetInstanceInfoText(instance)):
3636
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3637
                                   " node '%s'" %
3638
                                   (new_lv.logical_id[1], tgt_node))
3639

    
3640
    # Step: for each lv, detach+rename*2+attach
3641
    self.processor.LogStep(4, steps_total, "change drbd configuration")
3642
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3643
      info("detaching %s drbd from local storage" % dev.iv_name)
3644
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3645
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3646
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3647
      #dev.children = []
3648
      #cfg.Update(instance)
3649

    
3650
      # ok, we created the new LVs, so now we know we have the needed
3651
      # storage; as such, we proceed on the target node to rename
3652
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3653
      # using the assumption than logical_id == physical_id (which in
3654
      # turn is the unique_id on that node)
3655

    
3656
      # FIXME(iustin): use a better name for the replaced LVs
3657
      temp_suffix = int(time.time())
3658
      ren_fn = lambda d, suff: (d.physical_id[0],
3659
                                d.physical_id[1] + "_replaced-%s" % suff)
3660
      # build the rename list based on what LVs exist on the node
3661
      rlist = []
3662
      for to_ren in old_lvs:
3663
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3664
        if find_res is not None: # device exists
3665
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3666

    
3667
      info("renaming the old LVs on the target node")
3668
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3669
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3670
      # now we rename the new LVs to the old LVs
3671
      info("renaming the new LVs on the target node")
3672
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3673
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3674
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3675

    
3676
      for old, new in zip(old_lvs, new_lvs):
3677
        new.logical_id = old.logical_id
3678
        cfg.SetDiskID(new, tgt_node)
3679

    
3680
      for disk in old_lvs:
3681
        disk.logical_id = ren_fn(disk, temp_suffix)
3682
        cfg.SetDiskID(disk, tgt_node)
3683

    
3684
      # now that the new lvs have the old name, we can add them to the device
3685
      info("adding new mirror component on %s" % tgt_node)
3686
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3687
        for new_lv in new_lvs:
3688
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3689
            warning("Can't rollback device %s", "manually cleanup unused"
3690
                    " logical volumes")
3691
        raise errors.OpExecError("Can't add local storage to drbd")
3692

    
3693
      dev.children = new_lvs
3694
      cfg.Update(instance)
3695

    
3696
    # Step: wait for sync
3697

    
3698
    # this can fail as the old devices are degraded and _WaitForSync
3699
    # does a combined result over all disks, so we don't check its
3700
    # return value
3701
    self.processor.LogStep(5, steps_total, "sync devices")
3702
    _WaitForSync(cfg, instance, unlock=True)
3703

    
3704
    # so check manually all the devices
3705
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3706
      cfg.SetDiskID(dev, instance.primary_node)
3707
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3708
      if is_degr:
3709
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3710

    
3711
    # Step: remove old storage
3712
    self.processor.LogStep(6, steps_total, "removing old storage")
3713
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3714
      info("remove logical volumes for %s" % name)
3715
      for lv in old_lvs:
3716
        cfg.SetDiskID(lv, tgt_node)
3717
        if not rpc.call_blockdev_remove(tgt_node, lv):
3718
          warning("Can't remove old LV", "manually remove unused LVs")
3719
          continue
3720

    
3721
  def _ExecD8Secondary(self, feedback_fn):
3722
    """Replace the secondary node for drbd8.
3723

3724
    The algorithm for replace is quite complicated:
3725
      - for all disks of the instance:
3726
        - create new LVs on the new node with same names
3727
        - shutdown the drbd device on the old secondary
3728
        - disconnect the drbd network on the primary
3729
        - create the drbd device on the new secondary
3730
        - network attach the drbd on the primary, using an artifice:
3731
          the drbd code for Attach() will connect to the network if it
3732
          finds a device which is connected to the good local disks but
3733
          not network enabled
3734
      - wait for sync across all devices
3735
      - remove all disks from the old secondary
3736

3737
    Failures are not very well handled.
3738

3739
    """
3740
    steps_total = 6
3741
    warning, info = (self.processor.LogWarning, self.processor.LogInfo)
3742
    instance = self.instance
3743
    iv_names = {}
3744
    vgname = self.cfg.GetVGName()
3745
    # start of work
3746
    cfg = self.cfg
3747
    old_node = self.tgt_node
3748
    new_node = self.new_node
3749
    pri_node = instance.primary_node
3750

    
3751
    # Step: check device activation
3752
    self.processor.LogStep(1, steps_total, "check device existence")
3753
    info("checking volume groups")
3754
    my_vg = cfg.GetVGName()
3755
    results = rpc.call_vg_list([pri_node, new_node])
3756
    if not results:
3757
      raise errors.OpExecError("Can't list volume groups on the nodes")
3758
    for node in pri_node, new_node:
3759
      res = results.get(node, False)
3760
      if not res or my_vg not in res:
3761
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3762
                                 (my_vg, node))
3763
    for dev in instance.disks:
3764
      if not dev.iv_name in self.op.disks:
3765
        continue
3766
      info("checking %s on %s" % (dev.iv_name, pri_node))
3767
      cfg.SetDiskID(dev, pri_node)
3768
      if not rpc.call_blockdev_find(pri_node, dev):
3769
        raise errors.OpExecError("Can't find device %s on node %s" %
3770
                                 (dev.iv_name, pri_node))
3771

    
3772
    # Step: check other node consistency
3773
    self.processor.LogStep(2, steps_total, "check peer consistency")
3774
    for dev in instance.disks:
3775
      if not dev.iv_name in self.op.disks:
3776
        continue
3777
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3778
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3779
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3780
                                 " unsafe to replace the secondary" %
3781
                                 pri_node)
3782

    
3783
    # Step: create new storage
3784
    self.processor.LogStep(3, steps_total, "allocate new storage")
3785
    for dev in instance.disks:
3786
      size = dev.size
3787
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3788
      # since we *always* want to create this LV, we use the
3789
      # _Create...OnPrimary (which forces the creation), even if we
3790
      # are talking about the secondary node
3791
      for new_lv in dev.children:
3792
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3793
                                        _GetInstanceInfoText(instance)):
3794
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3795
                                   " node '%s'" %
3796
                                   (new_lv.logical_id[1], new_node))
3797

    
3798
      iv_names[dev.iv_name] = (dev, dev.children)
3799

    
3800
    self.processor.LogStep(4, steps_total, "changing drbd configuration")
3801
    for dev in instance.disks:
3802
      size = dev.size
3803
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3804
      # create new devices on new_node
3805
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3806
                              logical_id=(pri_node, new_node,
3807
                                          dev.logical_id[2]),
3808
                              children=dev.children)
3809
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3810
                                        new_drbd, False,
3811
                                      _GetInstanceInfoText(instance)):
3812
        raise errors.OpExecError("Failed to create new DRBD on"
3813
                                 " node '%s'" % new_node)
3814

    
3815
    for dev in instance.disks:
3816
      # we have new devices, shutdown the drbd on the old secondary
3817
      info("shutting down drbd for %s on old node" % dev.iv_name)
3818
      cfg.SetDiskID(dev, old_node)
3819
      if not rpc.call_blockdev_shutdown(old_node, dev):
3820
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3821
                "Please cleanup this device manuall as soon as possible")
3822

    
3823
      # we have new storage, we 'rename' the network on the primary
3824
      info("switching primary drbd for %s to new secondary node" % dev.iv_name)
3825
      cfg.SetDiskID(dev, pri_node)
3826
      # rename to the ip of the new node
3827
      new_uid = list(dev.physical_id)
3828
      new_uid[2] = self.remote_node_info.secondary_ip
3829
      rlist = [(dev, tuple(new_uid))]
3830
      if not rpc.call_blockdev_rename(pri_node, rlist):
3831
        raise errors.OpExecError("Can't detach & re-attach drbd %s on node"
3832
                                 " %s from %s to %s" %
3833
                                 (dev.iv_name, pri_node, old_node, new_node))
3834
      dev.logical_id = (pri_node, new_node, dev.logical_id[2])
3835
      cfg.SetDiskID(dev, pri_node)
3836
      cfg.Update(instance)
3837

    
3838

    
3839
    # this can fail as the old devices are degraded and _WaitForSync
3840
    # does a combined result over all disks, so we don't check its
3841
    # return value
3842
    self.processor.LogStep(5, steps_total, "sync devices")
3843
    _WaitForSync(cfg, instance, unlock=True)
3844

    
3845
    # so check manually all the devices
3846
    for name, (dev, old_lvs) in iv_names.iteritems():
3847
      cfg.SetDiskID(dev, pri_node)
3848
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3849
      if is_degr:
3850
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3851

    
3852
    self.processor.LogStep(6, steps_total, "removing old storage")
3853
    for name, (dev, old_lvs) in iv_names.iteritems():
3854
      info("remove logical volumes for %s" % name)
3855
      for lv in old_lvs:
3856
        cfg.SetDiskID(lv, old_node)
3857
        if not rpc.call_blockdev_remove(old_node, lv):
3858
          warning("Can't remove LV on old secondary",
3859
                  "Cleanup stale volumes by hand")
3860

    
3861
  def Exec(self, feedback_fn):
3862
    """Execute disk replacement.
3863

3864
    This dispatches the disk replacement to the appropriate handler.
3865

3866
    """
3867
    instance = self.instance
3868
    if instance.disk_template == constants.DT_REMOTE_RAID1:
3869
      fn = self._ExecRR1
3870
    elif instance.disk_template == constants.DT_DRBD8:
3871
      if self.op.remote_node is None:
3872
        fn = self._ExecD8DiskOnly
3873
      else:
3874
        fn = self._ExecD8Secondary
3875
    else:
3876
      raise errors.ProgrammerError("Unhandled disk replacement case")
3877
    return fn(feedback_fn)
3878

    
3879

    
3880
class LUQueryInstanceData(NoHooksLU):
3881
  """Query runtime instance data.
3882

3883
  """
3884
  _OP_REQP = ["instances"]
3885

    
3886
  def CheckPrereq(self):
3887
    """Check prerequisites.
3888

3889
    This only checks the optional instance list against the existing names.
3890

3891
    """
3892
    if not isinstance(self.op.instances, list):
3893
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3894
    if self.op.instances:
3895
      self.wanted_instances = []
3896
      names = self.op.instances
3897
      for name in names:
3898
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3899
        if instance is None:
3900
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3901
      self.wanted_instances.append(instance)
3902
    else:
3903
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3904
                               in self.cfg.GetInstanceList()]
3905
    return
3906

    
3907

    
3908
  def _ComputeDiskStatus(self, instance, snode, dev):
3909
    """Compute block device status.
3910

3911
    """
3912
    self.cfg.SetDiskID(dev, instance.primary_node)
3913
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3914
    if dev.dev_type in constants.LDS_DRBD:
3915
      # we change the snode then (otherwise we use the one passed in)
3916
      if dev.logical_id[0] == instance.primary_node:
3917
        snode = dev.logical_id[1]
3918
      else:
3919
        snode = dev.logical_id[0]
3920

    
3921
    if snode:
3922
      self.cfg.SetDiskID(dev, snode)
3923
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3924
    else:
3925
      dev_sstatus = None
3926

    
3927
    if dev.children:
3928
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3929
                      for child in dev.children]
3930
    else:
3931
      dev_children = []
3932

    
3933
    data = {
3934
      "iv_name": dev.iv_name,
3935
      "dev_type": dev.dev_type,
3936
      "logical_id": dev.logical_id,
3937
      "physical_id": dev.physical_id,
3938
      "pstatus": dev_pstatus,
3939
      "sstatus": dev_sstatus,
3940
      "children": dev_children,
3941
      }
3942

    
3943
    return data
3944

    
3945
  def Exec(self, feedback_fn):
3946
    """Gather and return data"""
3947
    result = {}
3948
    for instance in self.wanted_instances:
3949
      remote_info = rpc.call_instance_info(instance.primary_node,
3950
                                                instance.name)
3951
      if remote_info and "state" in remote_info:
3952
        remote_state = "up"
3953
      else:
3954
        remote_state = "down"
3955
      if instance.status == "down":
3956
        config_state = "down"
3957
      else:
3958
        config_state = "up"
3959

    
3960
      disks = [self._ComputeDiskStatus(instance, None, device)
3961
               for device in instance.disks]
3962

    
3963
      idict = {
3964
        "name": instance.name,
3965
        "config_state": config_state,
3966
        "run_state": remote_state,
3967
        "pnode": instance.primary_node,
3968
        "snodes": instance.secondary_nodes,
3969
        "os": instance.os,
3970
        "memory": instance.memory,
3971
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3972
        "disks": disks,
3973
        "vcpus": instance.vcpus,
3974
        }
3975

    
3976
      result[instance.name] = idict
3977

    
3978
    return result
3979

    
3980

    
3981
class LUSetInstanceParms(LogicalUnit):
3982
  """Modifies an instances's parameters.
3983

3984
  """
3985
  HPATH = "instance-modify"
3986
  HTYPE = constants.HTYPE_INSTANCE
3987
  _OP_REQP = ["instance_name"]
3988

    
3989
  def BuildHooksEnv(self):
3990
    """Build hooks env.
3991

3992
    This runs on the master, primary and secondaries.
3993

3994
    """
3995
    args = dict()
3996
    if self.mem:
3997
      args['memory'] = self.mem
3998
    if self.vcpus:
3999
      args['vcpus'] = self.vcpus
4000
    if self.do_ip or self.do_bridge:
4001
      if self.do_ip:
4002
        ip = self.ip
4003
      else:
4004
        ip = self.instance.nics[0].ip
4005
      if self.bridge:
4006
        bridge = self.bridge
4007
      else:
4008
        bridge = self.instance.nics[0].bridge
4009
      args['nics'] = [(ip, bridge)]
4010
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4011
    nl = [self.sstore.GetMasterNode(),
4012
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4013
    return env, nl, nl
4014

    
4015
  def CheckPrereq(self):
4016
    """Check prerequisites.
4017

4018
    This only checks the instance list against the existing names.
4019

4020
    """
4021
    self.mem = getattr(self.op, "mem", None)
4022
    self.vcpus = getattr(self.op, "vcpus", None)
4023
    self.ip = getattr(self.op, "ip", None)
4024
    self.bridge = getattr(self.op, "bridge", None)
4025
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
4026
      raise errors.OpPrereqError("No changes submitted")
4027
    if self.mem is not None:
4028
      try:
4029
        self.mem = int(self.mem)
4030
      except ValueError, err:
4031
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4032
    if self.vcpus is not None:
4033
      try:
4034
        self.vcpus = int(self.vcpus)
4035
      except ValueError, err:
4036
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4037
    if self.ip is not None:
4038
      self.do_ip = True
4039
      if self.ip.lower() == "none":
4040
        self.ip = None
4041
      else:
4042
        if not utils.IsValidIP(self.ip):
4043
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4044
    else:
4045
      self.do_ip = False
4046
    self.do_bridge = (self.bridge is not None)
4047

    
4048
    instance = self.cfg.GetInstanceInfo(
4049
      self.cfg.ExpandInstanceName(self.op.instance_name))
4050
    if instance is None:
4051
      raise errors.OpPrereqError("No such instance name '%s'" %
4052
                                 self.op.instance_name)
4053
    self.op.instance_name = instance.name
4054
    self.instance = instance
4055
    return
4056

    
4057
  def Exec(self, feedback_fn):
4058
    """Modifies an instance.
4059

4060
    All parameters take effect only at the next restart of the instance.
4061
    """
4062
    result = []
4063
    instance = self.instance
4064
    if self.mem:
4065
      instance.memory = self.mem
4066
      result.append(("mem", self.mem))
4067
    if self.vcpus:
4068
      instance.vcpus = self.vcpus
4069
      result.append(("vcpus",  self.vcpus))
4070
    if self.do_ip:
4071
      instance.nics[0].ip = self.ip
4072
      result.append(("ip", self.ip))
4073
    if self.bridge:
4074
      instance.nics[0].bridge = self.bridge
4075
      result.append(("bridge", self.bridge))
4076

    
4077
    self.cfg.AddInstance(instance)
4078

    
4079
    return result
4080

    
4081

    
4082
class LUQueryExports(NoHooksLU):
4083
  """Query the exports list
4084

4085
  """
4086
  _OP_REQP = []
4087

    
4088
  def CheckPrereq(self):
4089
    """Check that the nodelist contains only existing nodes.
4090

4091
    """
4092
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4093

    
4094
  def Exec(self, feedback_fn):
4095
    """Compute the list of all the exported system images.
4096

4097
    Returns:
4098
      a dictionary with the structure node->(export-list)
4099
      where export-list is a list of the instances exported on
4100
      that node.
4101

4102
    """
4103
    return rpc.call_export_list(self.nodes)
4104

    
4105

    
4106
class LUExportInstance(LogicalUnit):
4107
  """Export an instance to an image in the cluster.
4108

4109
  """
4110
  HPATH = "instance-export"
4111
  HTYPE = constants.HTYPE_INSTANCE
4112
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4113

    
4114
  def BuildHooksEnv(self):
4115
    """Build hooks env.
4116

4117
    This will run on the master, primary node and target node.
4118

4119
    """
4120
    env = {
4121
      "EXPORT_NODE": self.op.target_node,
4122
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4123
      }
4124
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4125
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4126
          self.op.target_node]
4127
    return env, nl, nl
4128

    
4129
  def CheckPrereq(self):
4130
    """Check prerequisites.
4131

4132
    This checks that the instance name is a valid one.
4133

4134
    """
4135
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4136
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4137
    if self.instance is None:
4138
      raise errors.OpPrereqError("Instance '%s' not found" %
4139
                                 self.op.instance_name)
4140

    
4141
    # node verification
4142
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4143
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4144

    
4145
    if self.dst_node is None:
4146
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4147
                                 self.op.target_node)
4148
    self.op.target_node = self.dst_node.name
4149

    
4150
  def Exec(self, feedback_fn):
4151
    """Export an instance to an image in the cluster.
4152

4153
    """
4154
    instance = self.instance
4155
    dst_node = self.dst_node
4156
    src_node = instance.primary_node
4157
    # shutdown the instance, unless requested not to do so
4158
    if self.op.shutdown:
4159
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
4160
      self.processor.ChainOpCode(op)
4161

    
4162
    vgname = self.cfg.GetVGName()
4163

    
4164
    snap_disks = []
4165

    
4166
    try:
4167
      for disk in instance.disks:
4168
        if disk.iv_name == "sda":
4169
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4170
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4171

    
4172
          if not new_dev_name:
4173
            logger.Error("could not snapshot block device %s on node %s" %
4174
                         (disk.logical_id[1], src_node))
4175
          else:
4176
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4177
                                      logical_id=(vgname, new_dev_name),
4178
                                      physical_id=(vgname, new_dev_name),
4179
                                      iv_name=disk.iv_name)
4180
            snap_disks.append(new_dev)
4181

    
4182
    finally:
4183
      if self.op.shutdown:
4184
        op = opcodes.OpStartupInstance(instance_name=instance.name,
4185
                                       force=False)
4186
        self.processor.ChainOpCode(op)
4187

    
4188
    # TODO: check for size
4189

    
4190
    for dev in snap_disks:
4191
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4192
                                           instance):
4193
        logger.Error("could not export block device %s from node"
4194
                     " %s to node %s" %
4195
                     (dev.logical_id[1], src_node, dst_node.name))
4196
      if not rpc.call_blockdev_remove(src_node, dev):
4197
        logger.Error("could not remove snapshot block device %s from"
4198
                     " node %s" % (dev.logical_id[1], src_node))
4199

    
4200
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4201
      logger.Error("could not finalize export for instance %s on node %s" %
4202
                   (instance.name, dst_node.name))
4203

    
4204
    nodelist = self.cfg.GetNodeList()
4205
    nodelist.remove(dst_node.name)
4206

    
4207
    # on one-node clusters nodelist will be empty after the removal
4208
    # if we proceed the backup would be removed because OpQueryExports
4209
    # substitutes an empty list with the full cluster node list.
4210
    if nodelist:
4211
      op = opcodes.OpQueryExports(nodes=nodelist)
4212
      exportlist = self.processor.ChainOpCode(op)
4213
      for node in exportlist:
4214
        if instance.name in exportlist[node]:
4215
          if not rpc.call_export_remove(node, instance.name):
4216
            logger.Error("could not remove older export for instance %s"
4217
                         " on node %s" % (instance.name, node))
4218

    
4219

    
4220
class TagsLU(NoHooksLU):
4221
  """Generic tags LU.
4222

4223
  This is an abstract class which is the parent of all the other tags LUs.
4224

4225
  """
4226
  def CheckPrereq(self):
4227
    """Check prerequisites.
4228

4229
    """
4230
    if self.op.kind == constants.TAG_CLUSTER:
4231
      self.target = self.cfg.GetClusterInfo()
4232
    elif self.op.kind == constants.TAG_NODE:
4233
      name = self.cfg.ExpandNodeName(self.op.name)
4234
      if name is None:
4235
        raise errors.OpPrereqError("Invalid node name (%s)" %
4236
                                   (self.op.name,))
4237
      self.op.name = name
4238
      self.target = self.cfg.GetNodeInfo(name)
4239
    elif self.op.kind == constants.TAG_INSTANCE:
4240
      name = self.cfg.ExpandInstanceName(self.op.name)
4241
      if name is None:
4242
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4243
                                   (self.op.name,))
4244
      self.op.name = name
4245
      self.target = self.cfg.GetInstanceInfo(name)
4246
    else:
4247
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4248
                                 str(self.op.kind))
4249

    
4250

    
4251
class LUGetTags(TagsLU):
4252
  """Returns the tags of a given object.
4253

4254
  """
4255
  _OP_REQP = ["kind", "name"]
4256

    
4257
  def Exec(self, feedback_fn):
4258
    """Returns the tag list.
4259

4260
    """
4261
    return self.target.GetTags()
4262

    
4263

    
4264
class LUSearchTags(NoHooksLU):
4265
  """Searches the tags for a given pattern.
4266

4267
  """
4268
  _OP_REQP = ["pattern"]
4269

    
4270
  def CheckPrereq(self):
4271
    """Check prerequisites.
4272

4273
    This checks the pattern passed for validity by compiling it.
4274

4275
    """
4276
    try:
4277
      self.re = re.compile(self.op.pattern)
4278
    except re.error, err:
4279
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4280
                                 (self.op.pattern, err))
4281

    
4282
  def Exec(self, feedback_fn):
4283
    """Returns the tag list.
4284

4285
    """
4286
    cfg = self.cfg
4287
    tgts = [("/cluster", cfg.GetClusterInfo())]
4288
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4289
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4290
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4291
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4292
    results = []
4293
    for path, target in tgts:
4294
      for tag in target.GetTags():
4295
        if self.re.search(tag):
4296
          results.append((path, tag))
4297
    return results
4298

    
4299

    
4300
class LUAddTags(TagsLU):
4301
  """Sets a tag on a given object.
4302

4303
  """
4304
  _OP_REQP = ["kind", "name", "tags"]
4305

    
4306
  def CheckPrereq(self):
4307
    """Check prerequisites.
4308

4309
    This checks the type and length of the tag name and value.
4310

4311
    """
4312
    TagsLU.CheckPrereq(self)
4313
    for tag in self.op.tags:
4314
      objects.TaggableObject.ValidateTag(tag)
4315

    
4316
  def Exec(self, feedback_fn):
4317
    """Sets the tag.
4318

4319
    """
4320
    try:
4321
      for tag in self.op.tags:
4322
        self.target.AddTag(tag)
4323
    except errors.TagError, err:
4324
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4325
    try:
4326
      self.cfg.Update(self.target)
4327
    except errors.ConfigurationError:
4328
      raise errors.OpRetryError("There has been a modification to the"
4329
                                " config file and the operation has been"
4330
                                " aborted. Please retry.")
4331

    
4332

    
4333
class LUDelTags(TagsLU):
4334
  """Delete a list of tags from a given object.
4335

4336
  """
4337
  _OP_REQP = ["kind", "name", "tags"]
4338

    
4339
  def CheckPrereq(self):
4340
    """Check prerequisites.
4341

4342
    This checks that we have the given tag.
4343

4344
    """
4345
    TagsLU.CheckPrereq(self)
4346
    for tag in self.op.tags:
4347
      objects.TaggableObject.ValidateTag(tag)
4348
    del_tags = frozenset(self.op.tags)
4349
    cur_tags = self.target.GetTags()
4350
    if not del_tags <= cur_tags:
4351
      diff_tags = del_tags - cur_tags
4352
      diff_names = ["'%s'" % tag for tag in diff_tags]
4353
      diff_names.sort()
4354
      raise errors.OpPrereqError("Tag(s) %s not found" %
4355
                                 (",".join(diff_names)))
4356

    
4357
  def Exec(self, feedback_fn):
4358
    """Remove the tag from the object.
4359

4360
    """
4361
    for tag in self.op.tags:
4362
      self.target.RemoveTag(tag)
4363
    try:
4364
      self.cfg.Update(self.target)
4365
    except errors.ConfigurationError:
4366
      raise errors.OpRetryError("There has been a modification to the"
4367
                                " config file and the operation has been"
4368
                                " aborted. Please retry.")