Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 65fe4693

History | View | Annotate | Download (138.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46
class LogicalUnit(object):
47
  """Logical Unit base class.
48

49
  Subclasses must follow these rules:
50
    - implement CheckPrereq which also fills in the opcode instance
51
      with all the fields (even if as None)
52
    - implement Exec
53
    - implement BuildHooksEnv
54
    - redefine HPATH and HTYPE
55
    - optionally redefine their run requirements (REQ_CLUSTER,
56
      REQ_MASTER); note that all commands require root permissions
57

58
  """
59
  HPATH = None
60
  HTYPE = None
61
  _OP_REQP = []
62
  REQ_CLUSTER = True
63
  REQ_MASTER = True
64

    
65
  def __init__(self, processor, op, cfg, sstore):
66
    """Constructor for LogicalUnit.
67

68
    This needs to be overriden in derived classes in order to check op
69
    validity.
70

71
    """
72
    self.processor = processor
73
    self.op = op
74
    self.cfg = cfg
75
    self.sstore = sstore
76
    for attr_name in self._OP_REQP:
77
      attr_val = getattr(op, attr_name, None)
78
      if attr_val is None:
79
        raise errors.OpPrereqError("Required parameter '%s' missing" %
80
                                   attr_name)
81
    if self.REQ_CLUSTER:
82
      if not cfg.IsCluster():
83
        raise errors.OpPrereqError("Cluster not initialized yet,"
84
                                   " use 'gnt-cluster init' first.")
85
      if self.REQ_MASTER:
86
        master = sstore.GetMasterNode()
87
        if master != utils.HostInfo().name:
88
          raise errors.OpPrereqError("Commands must be run on the master"
89
                                     " node %s" % master)
90

    
91
  def CheckPrereq(self):
92
    """Check prerequisites for this LU.
93

94
    This method should check that the prerequisites for the execution
95
    of this LU are fulfilled. It can do internode communication, but
96
    it should be idempotent - no cluster or system changes are
97
    allowed.
98

99
    The method should raise errors.OpPrereqError in case something is
100
    not fulfilled. Its return value is ignored.
101

102
    This method should also update all the parameters of the opcode to
103
    their canonical form; e.g. a short node name must be fully
104
    expanded after this method has successfully completed (so that
105
    hooks, logging, etc. work correctly).
106

107
    """
108
    raise NotImplementedError
109

    
110
  def Exec(self, feedback_fn):
111
    """Execute the LU.
112

113
    This method should implement the actual work. It should raise
114
    errors.OpExecError for failures that are somewhat dealt with in
115
    code, or expected.
116

117
    """
118
    raise NotImplementedError
119

    
120
  def BuildHooksEnv(self):
121
    """Build hooks environment for this LU.
122

123
    This method should return a three-node tuple consisting of: a dict
124
    containing the environment that will be used for running the
125
    specific hook for this LU, a list of node names on which the hook
126
    should run before the execution, and a list of node names on which
127
    the hook should run after the execution.
128

129
    The keys of the dict must not have 'GANETI_' prefixed as this will
130
    be handled in the hooks runner. Also note additional keys will be
131
    added by the hooks runner. If the LU doesn't define any
132
    environment, an empty dict (and not None) should be returned.
133

134
    As for the node lists, the master should not be included in the
135
    them, as it will be added by the hooks runner in case this LU
136
    requires a cluster to run on (otherwise we don't have a node
137
    list). No nodes should be returned as an empty list (and not
138
    None).
139

140
    Note that if the HPATH for a LU class is None, this function will
141
    not be called.
142

143
    """
144
    raise NotImplementedError
145

    
146

    
147
class NoHooksLU(LogicalUnit):
148
  """Simple LU which runs no hooks.
149

150
  This LU is intended as a parent for other LogicalUnits which will
151
  run no hooks, in order to reduce duplicate code.
152

153
  """
154
  HPATH = None
155
  HTYPE = None
156

    
157
  def BuildHooksEnv(self):
158
    """Build hooks env.
159

160
    This is a no-op, since we don't run hooks.
161

162
    """
163
    return {}, [], []
164

    
165

    
166
def _GetWantedNodes(lu, nodes):
167
  """Returns list of checked and expanded node names.
168

169
  Args:
170
    nodes: List of nodes (strings) or None for all
171

172
  """
173
  if not isinstance(nodes, list):
174
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
175

    
176
  if nodes:
177
    wanted = []
178

    
179
    for name in nodes:
180
      node = lu.cfg.ExpandNodeName(name)
181
      if node is None:
182
        raise errors.OpPrereqError("No such node name '%s'" % name)
183
      wanted.append(node)
184

    
185
  else:
186
    wanted = lu.cfg.GetNodeList()
187
  return utils.NiceSort(wanted)
188

    
189

    
190
def _GetWantedInstances(lu, instances):
191
  """Returns list of checked and expanded instance names.
192

193
  Args:
194
    instances: List of instances (strings) or None for all
195

196
  """
197
  if not isinstance(instances, list):
198
    raise errors.OpPrereqError("Invalid argument type 'instances'")
199

    
200
  if instances:
201
    wanted = []
202

    
203
    for name in instances:
204
      instance = lu.cfg.ExpandInstanceName(name)
205
      if instance is None:
206
        raise errors.OpPrereqError("No such instance name '%s'" % name)
207
      wanted.append(instance)
208

    
209
  else:
210
    wanted = lu.cfg.GetInstanceList()
211
  return utils.NiceSort(wanted)
212

    
213

    
214
def _CheckOutputFields(static, dynamic, selected):
215
  """Checks whether all selected fields are valid.
216

217
  Args:
218
    static: Static fields
219
    dynamic: Dynamic fields
220

221
  """
222
  static_fields = frozenset(static)
223
  dynamic_fields = frozenset(dynamic)
224

    
225
  all_fields = static_fields | dynamic_fields
226

    
227
  if not all_fields.issuperset(selected):
228
    raise errors.OpPrereqError("Unknown output fields selected: %s"
229
                               % ",".join(frozenset(selected).
230
                                          difference(all_fields)))
231

    
232

    
233
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
234
                          memory, vcpus, nics):
235
  """Builds instance related env variables for hooks from single variables.
236

237
  Args:
238
    secondary_nodes: List of secondary nodes as strings
239
  """
240
  env = {
241
    "OP_TARGET": name,
242
    "INSTANCE_NAME": name,
243
    "INSTANCE_PRIMARY": primary_node,
244
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
245
    "INSTANCE_OS_TYPE": os_type,
246
    "INSTANCE_STATUS": status,
247
    "INSTANCE_MEMORY": memory,
248
    "INSTANCE_VCPUS": vcpus,
249
  }
250

    
251
  if nics:
252
    nic_count = len(nics)
253
    for idx, (ip, bridge) in enumerate(nics):
254
      if ip is None:
255
        ip = ""
256
      env["INSTANCE_NIC%d_IP" % idx] = ip
257
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
258
  else:
259
    nic_count = 0
260

    
261
  env["INSTANCE_NIC_COUNT"] = nic_count
262

    
263
  return env
264

    
265

    
266
def _BuildInstanceHookEnvByObject(instance, override=None):
267
  """Builds instance related env variables for hooks from an object.
268

269
  Args:
270
    instance: objects.Instance object of instance
271
    override: dict of values to override
272
  """
273
  args = {
274
    'name': instance.name,
275
    'primary_node': instance.primary_node,
276
    'secondary_nodes': instance.secondary_nodes,
277
    'os_type': instance.os,
278
    'status': instance.os,
279
    'memory': instance.memory,
280
    'vcpus': instance.vcpus,
281
    'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
282
  }
283
  if override:
284
    args.update(override)
285
  return _BuildInstanceHookEnv(**args)
286

    
287

    
288
def _UpdateEtcHosts(fullnode, ip):
289
  """Ensure a node has a correct entry in /etc/hosts.
290

291
  Args:
292
    fullnode - Fully qualified domain name of host. (str)
293
    ip       - IPv4 address of host (str)
294

295
  """
296
  node = fullnode.split(".", 1)[0]
297

    
298
  f = open('/etc/hosts', 'r+')
299

    
300
  inthere = False
301

    
302
  save_lines = []
303
  add_lines = []
304
  removed = False
305

    
306
  while True:
307
    rawline = f.readline()
308

    
309
    if not rawline:
310
      # End of file
311
      break
312

    
313
    line = rawline.split('\n')[0]
314

    
315
    # Strip off comments
316
    line = line.split('#')[0]
317

    
318
    if not line:
319
      # Entire line was comment, skip
320
      save_lines.append(rawline)
321
      continue
322

    
323
    fields = line.split()
324

    
325
    haveall = True
326
    havesome = False
327
    for spec in [ ip, fullnode, node ]:
328
      if spec not in fields:
329
        haveall = False
330
      if spec in fields:
331
        havesome = True
332

    
333
    if haveall:
334
      inthere = True
335
      save_lines.append(rawline)
336
      continue
337

    
338
    if havesome and not haveall:
339
      # Line (old, or manual?) which is missing some.  Remove.
340
      removed = True
341
      continue
342

    
343
    save_lines.append(rawline)
344

    
345
  if not inthere:
346
    add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
347

    
348
  if removed:
349
    if add_lines:
350
      save_lines = save_lines + add_lines
351

    
352
    # We removed a line, write a new file and replace old.
353
    fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
354
    newfile = os.fdopen(fd, 'w')
355
    newfile.write(''.join(save_lines))
356
    newfile.close()
357
    os.rename(tmpname, '/etc/hosts')
358

    
359
  elif add_lines:
360
    # Simply appending a new line will do the trick.
361
    f.seek(0, 2)
362
    for add in add_lines:
363
      f.write(add)
364

    
365
  f.close()
366

    
367

    
368
def _UpdateKnownHosts(fullnode, ip, pubkey):
369
  """Ensure a node has a correct known_hosts entry.
370

371
  Args:
372
    fullnode - Fully qualified domain name of host. (str)
373
    ip       - IPv4 address of host (str)
374
    pubkey   - the public key of the cluster
375

376
  """
377
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
378
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
379
  else:
380
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
381

    
382
  inthere = False
383

    
384
  save_lines = []
385
  add_lines = []
386
  removed = False
387

    
388
  for rawline in f:
389
    logger.Debug('read %s' % (repr(rawline),))
390

    
391
    parts = rawline.rstrip('\r\n').split()
392

    
393
    # Ignore unwanted lines
394
    if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
395
      fields = parts[0].split(',')
396
      key = parts[2]
397

    
398
      haveall = True
399
      havesome = False
400
      for spec in [ ip, fullnode ]:
401
        if spec not in fields:
402
          haveall = False
403
        if spec in fields:
404
          havesome = True
405

    
406
      logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
407
      if haveall and key == pubkey:
408
        inthere = True
409
        save_lines.append(rawline)
410
        logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
411
        continue
412

    
413
      if havesome and (not haveall or key != pubkey):
414
        removed = True
415
        logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
416
        continue
417

    
418
    save_lines.append(rawline)
419

    
420
  if not inthere:
421
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
422
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
423

    
424
  if removed:
425
    save_lines = save_lines + add_lines
426

    
427
    # Write a new file and replace old.
428
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
429
                                   constants.DATA_DIR)
430
    newfile = os.fdopen(fd, 'w')
431
    try:
432
      newfile.write(''.join(save_lines))
433
    finally:
434
      newfile.close()
435
    logger.Debug("Wrote new known_hosts.")
436
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
437

    
438
  elif add_lines:
439
    # Simply appending a new line will do the trick.
440
    f.seek(0, 2)
441
    for add in add_lines:
442
      f.write(add)
443

    
444
  f.close()
445

    
446

    
447
def _HasValidVG(vglist, vgname):
448
  """Checks if the volume group list is valid.
449

450
  A non-None return value means there's an error, and the return value
451
  is the error message.
452

453
  """
454
  vgsize = vglist.get(vgname, None)
455
  if vgsize is None:
456
    return "volume group '%s' missing" % vgname
457
  elif vgsize < 20480:
458
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
459
            (vgname, vgsize))
460
  return None
461

    
462

    
463
def _InitSSHSetup(node):
464
  """Setup the SSH configuration for the cluster.
465

466

467
  This generates a dsa keypair for root, adds the pub key to the
468
  permitted hosts and adds the hostkey to its own known hosts.
469

470
  Args:
471
    node: the name of this host as a fqdn
472

473
  """
474
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
475

    
476
  for name in priv_key, pub_key:
477
    if os.path.exists(name):
478
      utils.CreateBackup(name)
479
    utils.RemoveFile(name)
480

    
481
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
482
                         "-f", priv_key,
483
                         "-q", "-N", ""])
484
  if result.failed:
485
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
486
                             result.output)
487

    
488
  f = open(pub_key, 'r')
489
  try:
490
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
491
  finally:
492
    f.close()
493

    
494

    
495
def _InitGanetiServerSetup(ss):
496
  """Setup the necessary configuration for the initial node daemon.
497

498
  This creates the nodepass file containing the shared password for
499
  the cluster and also generates the SSL certificate.
500

501
  """
502
  # Create pseudo random password
503
  randpass = sha.new(os.urandom(64)).hexdigest()
504
  # and write it into sstore
505
  ss.SetKey(ss.SS_NODED_PASS, randpass)
506

    
507
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
508
                         "-days", str(365*5), "-nodes", "-x509",
509
                         "-keyout", constants.SSL_CERT_FILE,
510
                         "-out", constants.SSL_CERT_FILE, "-batch"])
511
  if result.failed:
512
    raise errors.OpExecError("could not generate server ssl cert, command"
513
                             " %s had exitcode %s and error message %s" %
514
                             (result.cmd, result.exit_code, result.output))
515

    
516
  os.chmod(constants.SSL_CERT_FILE, 0400)
517

    
518
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
519

    
520
  if result.failed:
521
    raise errors.OpExecError("Could not start the node daemon, command %s"
522
                             " had exitcode %s and error %s" %
523
                             (result.cmd, result.exit_code, result.output))
524

    
525

    
526
def _CheckInstanceBridgesExist(instance):
527
  """Check that the brigdes needed by an instance exist.
528

529
  """
530
  # check bridges existance
531
  brlist = [nic.bridge for nic in instance.nics]
532
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
533
    raise errors.OpPrereqError("one or more target bridges %s does not"
534
                               " exist on destination node '%s'" %
535
                               (brlist, instance.primary_node))
536

    
537

    
538
class LUInitCluster(LogicalUnit):
539
  """Initialise the cluster.
540

541
  """
542
  HPATH = "cluster-init"
543
  HTYPE = constants.HTYPE_CLUSTER
544
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
545
              "def_bridge", "master_netdev"]
546
  REQ_CLUSTER = False
547

    
548
  def BuildHooksEnv(self):
549
    """Build hooks env.
550

551
    Notes: Since we don't require a cluster, we must manually add
552
    ourselves in the post-run node list.
553

554
    """
555
    env = {"OP_TARGET": self.op.cluster_name}
556
    return env, [], [self.hostname.name]
557

    
558
  def CheckPrereq(self):
559
    """Verify that the passed name is a valid one.
560

561
    """
562
    if config.ConfigWriter.IsCluster():
563
      raise errors.OpPrereqError("Cluster is already initialised")
564

    
565
    self.hostname = hostname = utils.HostInfo()
566

    
567
    if hostname.ip.startswith("127."):
568
      raise errors.OpPrereqError("This host's IP resolves to the private"
569
                                 " range (%s). Please fix DNS or /etc/hosts." %
570
                                 (hostname.ip,))
571

    
572
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
573

    
574
    if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
575
                         constants.DEFAULT_NODED_PORT):
576
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
577
                                 " to %s,\nbut this ip address does not"
578
                                 " belong to this host."
579
                                 " Aborting." % hostname.ip)
580

    
581
    secondary_ip = getattr(self.op, "secondary_ip", None)
582
    if secondary_ip and not utils.IsValidIP(secondary_ip):
583
      raise errors.OpPrereqError("Invalid secondary ip given")
584
    if (secondary_ip and
585
        secondary_ip != hostname.ip and
586
        (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
587
                           constants.DEFAULT_NODED_PORT))):
588
      raise errors.OpPrereqError("You gave %s as secondary IP,\n"
589
                                 "but it does not belong to this host." %
590
                                 secondary_ip)
591
    self.secondary_ip = secondary_ip
592

    
593
    # checks presence of the volume group given
594
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
595

    
596
    if vgstatus:
597
      raise errors.OpPrereqError("Error: %s" % vgstatus)
598

    
599
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
600
                    self.op.mac_prefix):
601
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
602
                                 self.op.mac_prefix)
603

    
604
    if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
605
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
606
                                 self.op.hypervisor_type)
607

    
608
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
609
    if result.failed:
610
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
611
                                 (self.op.master_netdev,
612
                                  result.output.strip()))
613

    
614
  def Exec(self, feedback_fn):
615
    """Initialize the cluster.
616

617
    """
618
    clustername = self.clustername
619
    hostname = self.hostname
620

    
621
    # set up the simple store
622
    self.sstore = ss = ssconf.SimpleStore()
623
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
624
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
625
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
626
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
627
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
628

    
629
    # set up the inter-node password and certificate
630
    _InitGanetiServerSetup(ss)
631

    
632
    # start the master ip
633
    rpc.call_node_start_master(hostname.name)
634

    
635
    # set up ssh config and /etc/hosts
636
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
637
    try:
638
      sshline = f.read()
639
    finally:
640
      f.close()
641
    sshkey = sshline.split(" ")[1]
642

    
643
    _UpdateEtcHosts(hostname.name, hostname.ip)
644

    
645
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
646

    
647
    _InitSSHSetup(hostname.name)
648

    
649
    # init of cluster config file
650
    self.cfg = cfgw = config.ConfigWriter()
651
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
652
                    sshkey, self.op.mac_prefix,
653
                    self.op.vg_name, self.op.def_bridge)
654

    
655

    
656
class LUDestroyCluster(NoHooksLU):
657
  """Logical unit for destroying the cluster.
658

659
  """
660
  _OP_REQP = []
661

    
662
  def CheckPrereq(self):
663
    """Check prerequisites.
664

665
    This checks whether the cluster is empty.
666

667
    Any errors are signalled by raising errors.OpPrereqError.
668

669
    """
670
    master = self.sstore.GetMasterNode()
671

    
672
    nodelist = self.cfg.GetNodeList()
673
    if len(nodelist) != 1 or nodelist[0] != master:
674
      raise errors.OpPrereqError("There are still %d node(s) in"
675
                                 " this cluster." % (len(nodelist) - 1))
676
    instancelist = self.cfg.GetInstanceList()
677
    if instancelist:
678
      raise errors.OpPrereqError("There are still %d instance(s) in"
679
                                 " this cluster." % len(instancelist))
680

    
681
  def Exec(self, feedback_fn):
682
    """Destroys the cluster.
683

684
    """
685
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
686
    utils.CreateBackup(priv_key)
687
    utils.CreateBackup(pub_key)
688
    rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
689

    
690

    
691
class LUVerifyCluster(NoHooksLU):
692
  """Verifies the cluster status.
693

694
  """
695
  _OP_REQP = []
696

    
697
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
698
                  remote_version, feedback_fn):
699
    """Run multiple tests against a node.
700

701
    Test list:
702
      - compares ganeti version
703
      - checks vg existance and size > 20G
704
      - checks config file checksum
705
      - checks ssh to other nodes
706

707
    Args:
708
      node: name of the node to check
709
      file_list: required list of files
710
      local_cksum: dictionary of local files and their checksums
711

712
    """
713
    # compares ganeti version
714
    local_version = constants.PROTOCOL_VERSION
715
    if not remote_version:
716
      feedback_fn(" - ERROR: connection to %s failed" % (node))
717
      return True
718

    
719
    if local_version != remote_version:
720
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
721
                      (local_version, node, remote_version))
722
      return True
723

    
724
    # checks vg existance and size > 20G
725

    
726
    bad = False
727
    if not vglist:
728
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
729
                      (node,))
730
      bad = True
731
    else:
732
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
733
      if vgstatus:
734
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
735
        bad = True
736

    
737
    # checks config file checksum
738
    # checks ssh to any
739

    
740
    if 'filelist' not in node_result:
741
      bad = True
742
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
743
    else:
744
      remote_cksum = node_result['filelist']
745
      for file_name in file_list:
746
        if file_name not in remote_cksum:
747
          bad = True
748
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
749
        elif remote_cksum[file_name] != local_cksum[file_name]:
750
          bad = True
751
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
752

    
753
    if 'nodelist' not in node_result:
754
      bad = True
755
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
756
    else:
757
      if node_result['nodelist']:
758
        bad = True
759
        for node in node_result['nodelist']:
760
          feedback_fn("  - ERROR: communication with node '%s': %s" %
761
                          (node, node_result['nodelist'][node]))
762
    hyp_result = node_result.get('hypervisor', None)
763
    if hyp_result is not None:
764
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
765
    return bad
766

    
767
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
768
    """Verify an instance.
769

770
    This function checks to see if the required block devices are
771
    available on the instance's node.
772

773
    """
774
    bad = False
775

    
776
    instancelist = self.cfg.GetInstanceList()
777
    if not instance in instancelist:
778
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
779
                      (instance, instancelist))
780
      bad = True
781

    
782
    instanceconfig = self.cfg.GetInstanceInfo(instance)
783
    node_current = instanceconfig.primary_node
784

    
785
    node_vol_should = {}
786
    instanceconfig.MapLVsByNode(node_vol_should)
787

    
788
    for node in node_vol_should:
789
      for volume in node_vol_should[node]:
790
        if node not in node_vol_is or volume not in node_vol_is[node]:
791
          feedback_fn("  - ERROR: volume %s missing on node %s" %
792
                          (volume, node))
793
          bad = True
794

    
795
    if not instanceconfig.status == 'down':
796
      if not instance in node_instance[node_current]:
797
        feedback_fn("  - ERROR: instance %s not running on node %s" %
798
                        (instance, node_current))
799
        bad = True
800

    
801
    for node in node_instance:
802
      if (not node == node_current):
803
        if instance in node_instance[node]:
804
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
805
                          (instance, node))
806
          bad = True
807

    
808
    return bad
809

    
810
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
811
    """Verify if there are any unknown volumes in the cluster.
812

813
    The .os, .swap and backup volumes are ignored. All other volumes are
814
    reported as unknown.
815

816
    """
817
    bad = False
818

    
819
    for node in node_vol_is:
820
      for volume in node_vol_is[node]:
821
        if node not in node_vol_should or volume not in node_vol_should[node]:
822
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
823
                      (volume, node))
824
          bad = True
825
    return bad
826

    
827
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
828
    """Verify the list of running instances.
829

830
    This checks what instances are running but unknown to the cluster.
831

832
    """
833
    bad = False
834
    for node in node_instance:
835
      for runninginstance in node_instance[node]:
836
        if runninginstance not in instancelist:
837
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
838
                          (runninginstance, node))
839
          bad = True
840
    return bad
841

    
842
  def CheckPrereq(self):
843
    """Check prerequisites.
844

845
    This has no prerequisites.
846

847
    """
848
    pass
849

    
850
  def Exec(self, feedback_fn):
851
    """Verify integrity of cluster, performing various test on nodes.
852

853
    """
854
    bad = False
855
    feedback_fn("* Verifying global settings")
856
    self.cfg.VerifyConfig()
857

    
858
    vg_name = self.cfg.GetVGName()
859
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
860
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
861
    node_volume = {}
862
    node_instance = {}
863

    
864
    # FIXME: verify OS list
865
    # do local checksums
866
    file_names = list(self.sstore.GetFileList())
867
    file_names.append(constants.SSL_CERT_FILE)
868
    file_names.append(constants.CLUSTER_CONF_FILE)
869
    local_checksums = utils.FingerprintFiles(file_names)
870

    
871
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
872
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
873
    all_instanceinfo = rpc.call_instance_list(nodelist)
874
    all_vglist = rpc.call_vg_list(nodelist)
875
    node_verify_param = {
876
      'filelist': file_names,
877
      'nodelist': nodelist,
878
      'hypervisor': None,
879
      }
880
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
881
    all_rversion = rpc.call_version(nodelist)
882

    
883
    for node in nodelist:
884
      feedback_fn("* Verifying node %s" % node)
885
      result = self._VerifyNode(node, file_names, local_checksums,
886
                                all_vglist[node], all_nvinfo[node],
887
                                all_rversion[node], feedback_fn)
888
      bad = bad or result
889

    
890
      # node_volume
891
      volumeinfo = all_volumeinfo[node]
892

    
893
      if type(volumeinfo) != dict:
894
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
895
        bad = True
896
        continue
897

    
898
      node_volume[node] = volumeinfo
899

    
900
      # node_instance
901
      nodeinstance = all_instanceinfo[node]
902
      if type(nodeinstance) != list:
903
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
904
        bad = True
905
        continue
906

    
907
      node_instance[node] = nodeinstance
908

    
909
    node_vol_should = {}
910

    
911
    for instance in instancelist:
912
      feedback_fn("* Verifying instance %s" % instance)
913
      result =  self._VerifyInstance(instance, node_volume, node_instance,
914
                                     feedback_fn)
915
      bad = bad or result
916

    
917
      inst_config = self.cfg.GetInstanceInfo(instance)
918

    
919
      inst_config.MapLVsByNode(node_vol_should)
920

    
921
    feedback_fn("* Verifying orphan volumes")
922
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
923
                                       feedback_fn)
924
    bad = bad or result
925

    
926
    feedback_fn("* Verifying remaining instances")
927
    result = self._VerifyOrphanInstances(instancelist, node_instance,
928
                                         feedback_fn)
929
    bad = bad or result
930

    
931
    return int(bad)
932

    
933

    
934
class LURenameCluster(LogicalUnit):
935
  """Rename the cluster.
936

937
  """
938
  HPATH = "cluster-rename"
939
  HTYPE = constants.HTYPE_CLUSTER
940
  _OP_REQP = ["name"]
941

    
942
  def BuildHooksEnv(self):
943
    """Build hooks env.
944

945
    """
946
    env = {
947
      "OP_TARGET": self.op.sstore.GetClusterName(),
948
      "NEW_NAME": self.op.name,
949
      }
950
    mn = self.sstore.GetMasterNode()
951
    return env, [mn], [mn]
952

    
953
  def CheckPrereq(self):
954
    """Verify that the passed name is a valid one.
955

956
    """
957
    hostname = utils.HostInfo(self.op.name)
958

    
959
    new_name = hostname.name
960
    self.ip = new_ip = hostname.ip
961
    old_name = self.sstore.GetClusterName()
962
    old_ip = self.sstore.GetMasterIP()
963
    if new_name == old_name and new_ip == old_ip:
964
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
965
                                 " cluster has changed")
966
    if new_ip != old_ip:
967
      result = utils.RunCmd(["fping", "-q", new_ip])
968
      if not result.failed:
969
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
970
                                   " reachable on the network. Aborting." %
971
                                   new_ip)
972

    
973
    self.op.name = new_name
974

    
975
  def Exec(self, feedback_fn):
976
    """Rename the cluster.
977

978
    """
979
    clustername = self.op.name
980
    ip = self.ip
981
    ss = self.sstore
982

    
983
    # shutdown the master IP
984
    master = ss.GetMasterNode()
985
    if not rpc.call_node_stop_master(master):
986
      raise errors.OpExecError("Could not disable the master role")
987

    
988
    try:
989
      # modify the sstore
990
      ss.SetKey(ss.SS_MASTER_IP, ip)
991
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
992

    
993
      # Distribute updated ss config to all nodes
994
      myself = self.cfg.GetNodeInfo(master)
995
      dist_nodes = self.cfg.GetNodeList()
996
      if myself.name in dist_nodes:
997
        dist_nodes.remove(myself.name)
998

    
999
      logger.Debug("Copying updated ssconf data to all nodes")
1000
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1001
        fname = ss.KeyToFilename(keyname)
1002
        result = rpc.call_upload_file(dist_nodes, fname)
1003
        for to_node in dist_nodes:
1004
          if not result[to_node]:
1005
            logger.Error("copy of file %s to node %s failed" %
1006
                         (fname, to_node))
1007
    finally:
1008
      if not rpc.call_node_start_master(master):
1009
        logger.Error("Could not re-enable the master role on the master,\n"
1010
                     "please restart manually.")
1011

    
1012

    
1013
def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
1014
  """Sleep and poll for an instance's disk to sync.
1015

1016
  """
1017
  if not instance.disks:
1018
    return True
1019

    
1020
  if not oneshot:
1021
    logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
1022

    
1023
  node = instance.primary_node
1024

    
1025
  for dev in instance.disks:
1026
    cfgw.SetDiskID(dev, node)
1027

    
1028
  retries = 0
1029
  while True:
1030
    max_time = 0
1031
    done = True
1032
    cumul_degraded = False
1033
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1034
    if not rstats:
1035
      logger.ToStderr("Can't get any data from node %s" % node)
1036
      retries += 1
1037
      if retries >= 10:
1038
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1039
                                 " aborting." % node)
1040
      time.sleep(6)
1041
      continue
1042
    retries = 0
1043
    for i in range(len(rstats)):
1044
      mstat = rstats[i]
1045
      if mstat is None:
1046
        logger.ToStderr("Can't compute data for node %s/%s" %
1047
                        (node, instance.disks[i].iv_name))
1048
        continue
1049
      perc_done, est_time, is_degraded = mstat
1050
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1051
      if perc_done is not None:
1052
        done = False
1053
        if est_time is not None:
1054
          rem_time = "%d estimated seconds remaining" % est_time
1055
          max_time = est_time
1056
        else:
1057
          rem_time = "no time estimate"
1058
        logger.ToStdout("- device %s: %5.2f%% done, %s" %
1059
                        (instance.disks[i].iv_name, perc_done, rem_time))
1060
    if done or oneshot:
1061
      break
1062

    
1063
    if unlock:
1064
      utils.Unlock('cmd')
1065
    try:
1066
      time.sleep(min(60, max_time))
1067
    finally:
1068
      if unlock:
1069
        utils.Lock('cmd')
1070

    
1071
  if done:
1072
    logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1073
  return not cumul_degraded
1074

    
1075

    
1076
def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1077
  """Check that mirrors are not degraded.
1078

1079
  """
1080
  cfgw.SetDiskID(dev, node)
1081

    
1082
  result = True
1083
  if on_primary or dev.AssembleOnSecondary():
1084
    rstats = rpc.call_blockdev_find(node, dev)
1085
    if not rstats:
1086
      logger.ToStderr("Can't get any data from node %s" % node)
1087
      result = False
1088
    else:
1089
      result = result and (not rstats[5])
1090
  if dev.children:
1091
    for child in dev.children:
1092
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1093

    
1094
  return result
1095

    
1096

    
1097
class LUDiagnoseOS(NoHooksLU):
1098
  """Logical unit for OS diagnose/query.
1099

1100
  """
1101
  _OP_REQP = []
1102

    
1103
  def CheckPrereq(self):
1104
    """Check prerequisites.
1105

1106
    This always succeeds, since this is a pure query LU.
1107

1108
    """
1109
    return
1110

    
1111
  def Exec(self, feedback_fn):
1112
    """Compute the list of OSes.
1113

1114
    """
1115
    node_list = self.cfg.GetNodeList()
1116
    node_data = rpc.call_os_diagnose(node_list)
1117
    if node_data == False:
1118
      raise errors.OpExecError("Can't gather the list of OSes")
1119
    return node_data
1120

    
1121

    
1122
class LURemoveNode(LogicalUnit):
1123
  """Logical unit for removing a node.
1124

1125
  """
1126
  HPATH = "node-remove"
1127
  HTYPE = constants.HTYPE_NODE
1128
  _OP_REQP = ["node_name"]
1129

    
1130
  def BuildHooksEnv(self):
1131
    """Build hooks env.
1132

1133
    This doesn't run on the target node in the pre phase as a failed
1134
    node would not allows itself to run.
1135

1136
    """
1137
    env = {
1138
      "OP_TARGET": self.op.node_name,
1139
      "NODE_NAME": self.op.node_name,
1140
      }
1141
    all_nodes = self.cfg.GetNodeList()
1142
    all_nodes.remove(self.op.node_name)
1143
    return env, all_nodes, all_nodes
1144

    
1145
  def CheckPrereq(self):
1146
    """Check prerequisites.
1147

1148
    This checks:
1149
     - the node exists in the configuration
1150
     - it does not have primary or secondary instances
1151
     - it's not the master
1152

1153
    Any errors are signalled by raising errors.OpPrereqError.
1154

1155
    """
1156
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1157
    if node is None:
1158
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1159

    
1160
    instance_list = self.cfg.GetInstanceList()
1161

    
1162
    masternode = self.sstore.GetMasterNode()
1163
    if node.name == masternode:
1164
      raise errors.OpPrereqError("Node is the master node,"
1165
                                 " you need to failover first.")
1166

    
1167
    for instance_name in instance_list:
1168
      instance = self.cfg.GetInstanceInfo(instance_name)
1169
      if node.name == instance.primary_node:
1170
        raise errors.OpPrereqError("Instance %s still running on the node,"
1171
                                   " please remove first." % instance_name)
1172
      if node.name in instance.secondary_nodes:
1173
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1174
                                   " please remove first." % instance_name)
1175
    self.op.node_name = node.name
1176
    self.node = node
1177

    
1178
  def Exec(self, feedback_fn):
1179
    """Removes the node from the cluster.
1180

1181
    """
1182
    node = self.node
1183
    logger.Info("stopping the node daemon and removing configs from node %s" %
1184
                node.name)
1185

    
1186
    rpc.call_node_leave_cluster(node.name)
1187

    
1188
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1189

    
1190
    logger.Info("Removing node %s from config" % node.name)
1191

    
1192
    self.cfg.RemoveNode(node.name)
1193

    
1194

    
1195
class LUQueryNodes(NoHooksLU):
1196
  """Logical unit for querying nodes.
1197

1198
  """
1199
  _OP_REQP = ["output_fields", "names"]
1200

    
1201
  def CheckPrereq(self):
1202
    """Check prerequisites.
1203

1204
    This checks that the fields required are valid output fields.
1205

1206
    """
1207
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1208
                                     "mtotal", "mnode", "mfree",
1209
                                     "bootid"])
1210

    
1211
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1212
                               "pinst_list", "sinst_list",
1213
                               "pip", "sip"],
1214
                       dynamic=self.dynamic_fields,
1215
                       selected=self.op.output_fields)
1216

    
1217
    self.wanted = _GetWantedNodes(self, self.op.names)
1218

    
1219
  def Exec(self, feedback_fn):
1220
    """Computes the list of nodes and their attributes.
1221

1222
    """
1223
    nodenames = self.wanted
1224
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1225

    
1226
    # begin data gathering
1227

    
1228
    if self.dynamic_fields.intersection(self.op.output_fields):
1229
      live_data = {}
1230
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1231
      for name in nodenames:
1232
        nodeinfo = node_data.get(name, None)
1233
        if nodeinfo:
1234
          live_data[name] = {
1235
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1236
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1237
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1238
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1239
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1240
            "bootid": nodeinfo['bootid'],
1241
            }
1242
        else:
1243
          live_data[name] = {}
1244
    else:
1245
      live_data = dict.fromkeys(nodenames, {})
1246

    
1247
    node_to_primary = dict([(name, set()) for name in nodenames])
1248
    node_to_secondary = dict([(name, set()) for name in nodenames])
1249

    
1250
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1251
                             "sinst_cnt", "sinst_list"))
1252
    if inst_fields & frozenset(self.op.output_fields):
1253
      instancelist = self.cfg.GetInstanceList()
1254

    
1255
      for instance_name in instancelist:
1256
        inst = self.cfg.GetInstanceInfo(instance_name)
1257
        if inst.primary_node in node_to_primary:
1258
          node_to_primary[inst.primary_node].add(inst.name)
1259
        for secnode in inst.secondary_nodes:
1260
          if secnode in node_to_secondary:
1261
            node_to_secondary[secnode].add(inst.name)
1262

    
1263
    # end data gathering
1264

    
1265
    output = []
1266
    for node in nodelist:
1267
      node_output = []
1268
      for field in self.op.output_fields:
1269
        if field == "name":
1270
          val = node.name
1271
        elif field == "pinst_list":
1272
          val = list(node_to_primary[node.name])
1273
        elif field == "sinst_list":
1274
          val = list(node_to_secondary[node.name])
1275
        elif field == "pinst_cnt":
1276
          val = len(node_to_primary[node.name])
1277
        elif field == "sinst_cnt":
1278
          val = len(node_to_secondary[node.name])
1279
        elif field == "pip":
1280
          val = node.primary_ip
1281
        elif field == "sip":
1282
          val = node.secondary_ip
1283
        elif field in self.dynamic_fields:
1284
          val = live_data[node.name].get(field, None)
1285
        else:
1286
          raise errors.ParameterError(field)
1287
        node_output.append(val)
1288
      output.append(node_output)
1289

    
1290
    return output
1291

    
1292

    
1293
class LUQueryNodeVolumes(NoHooksLU):
1294
  """Logical unit for getting volumes on node(s).
1295

1296
  """
1297
  _OP_REQP = ["nodes", "output_fields"]
1298

    
1299
  def CheckPrereq(self):
1300
    """Check prerequisites.
1301

1302
    This checks that the fields required are valid output fields.
1303

1304
    """
1305
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1306

    
1307
    _CheckOutputFields(static=["node"],
1308
                       dynamic=["phys", "vg", "name", "size", "instance"],
1309
                       selected=self.op.output_fields)
1310

    
1311

    
1312
  def Exec(self, feedback_fn):
1313
    """Computes the list of nodes and their attributes.
1314

1315
    """
1316
    nodenames = self.nodes
1317
    volumes = rpc.call_node_volumes(nodenames)
1318

    
1319
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1320
             in self.cfg.GetInstanceList()]
1321

    
1322
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1323

    
1324
    output = []
1325
    for node in nodenames:
1326
      if node not in volumes or not volumes[node]:
1327
        continue
1328

    
1329
      node_vols = volumes[node][:]
1330
      node_vols.sort(key=lambda vol: vol['dev'])
1331

    
1332
      for vol in node_vols:
1333
        node_output = []
1334
        for field in self.op.output_fields:
1335
          if field == "node":
1336
            val = node
1337
          elif field == "phys":
1338
            val = vol['dev']
1339
          elif field == "vg":
1340
            val = vol['vg']
1341
          elif field == "name":
1342
            val = vol['name']
1343
          elif field == "size":
1344
            val = int(float(vol['size']))
1345
          elif field == "instance":
1346
            for inst in ilist:
1347
              if node not in lv_by_node[inst]:
1348
                continue
1349
              if vol['name'] in lv_by_node[inst][node]:
1350
                val = inst.name
1351
                break
1352
            else:
1353
              val = '-'
1354
          else:
1355
            raise errors.ParameterError(field)
1356
          node_output.append(str(val))
1357

    
1358
        output.append(node_output)
1359

    
1360
    return output
1361

    
1362

    
1363
class LUAddNode(LogicalUnit):
1364
  """Logical unit for adding node to the cluster.
1365

1366
  """
1367
  HPATH = "node-add"
1368
  HTYPE = constants.HTYPE_NODE
1369
  _OP_REQP = ["node_name"]
1370

    
1371
  def BuildHooksEnv(self):
1372
    """Build hooks env.
1373

1374
    This will run on all nodes before, and on all nodes + the new node after.
1375

1376
    """
1377
    env = {
1378
      "OP_TARGET": self.op.node_name,
1379
      "NODE_NAME": self.op.node_name,
1380
      "NODE_PIP": self.op.primary_ip,
1381
      "NODE_SIP": self.op.secondary_ip,
1382
      }
1383
    nodes_0 = self.cfg.GetNodeList()
1384
    nodes_1 = nodes_0 + [self.op.node_name, ]
1385
    return env, nodes_0, nodes_1
1386

    
1387
  def CheckPrereq(self):
1388
    """Check prerequisites.
1389

1390
    This checks:
1391
     - the new node is not already in the config
1392
     - it is resolvable
1393
     - its parameters (single/dual homed) matches the cluster
1394

1395
    Any errors are signalled by raising errors.OpPrereqError.
1396

1397
    """
1398
    node_name = self.op.node_name
1399
    cfg = self.cfg
1400

    
1401
    dns_data = utils.HostInfo(node_name)
1402

    
1403
    node = dns_data.name
1404
    primary_ip = self.op.primary_ip = dns_data.ip
1405
    secondary_ip = getattr(self.op, "secondary_ip", None)
1406
    if secondary_ip is None:
1407
      secondary_ip = primary_ip
1408
    if not utils.IsValidIP(secondary_ip):
1409
      raise errors.OpPrereqError("Invalid secondary IP given")
1410
    self.op.secondary_ip = secondary_ip
1411
    node_list = cfg.GetNodeList()
1412
    if node in node_list:
1413
      raise errors.OpPrereqError("Node %s is already in the configuration"
1414
                                 % node)
1415

    
1416
    for existing_node_name in node_list:
1417
      existing_node = cfg.GetNodeInfo(existing_node_name)
1418
      if (existing_node.primary_ip == primary_ip or
1419
          existing_node.secondary_ip == primary_ip or
1420
          existing_node.primary_ip == secondary_ip or
1421
          existing_node.secondary_ip == secondary_ip):
1422
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1423
                                   " existing node %s" % existing_node.name)
1424

    
1425
    # check that the type of the node (single versus dual homed) is the
1426
    # same as for the master
1427
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1428
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1429
    newbie_singlehomed = secondary_ip == primary_ip
1430
    if master_singlehomed != newbie_singlehomed:
1431
      if master_singlehomed:
1432
        raise errors.OpPrereqError("The master has no private ip but the"
1433
                                   " new node has one")
1434
      else:
1435
        raise errors.OpPrereqError("The master has a private ip but the"
1436
                                   " new node doesn't have one")
1437

    
1438
    # checks reachablity
1439
    if not utils.TcpPing(utils.HostInfo().name,
1440
                         primary_ip,
1441
                         constants.DEFAULT_NODED_PORT):
1442
      raise errors.OpPrereqError("Node not reachable by ping")
1443

    
1444
    if not newbie_singlehomed:
1445
      # check reachability from my secondary ip to newbie's secondary ip
1446
      if not utils.TcpPing(myself.secondary_ip,
1447
                           secondary_ip,
1448
                           constants.DEFAULT_NODED_PORT):
1449
        raise errors.OpPrereqError(
1450
          "Node secondary ip not reachable by TCP based ping to noded port")
1451

    
1452
    self.new_node = objects.Node(name=node,
1453
                                 primary_ip=primary_ip,
1454
                                 secondary_ip=secondary_ip)
1455

    
1456
  def Exec(self, feedback_fn):
1457
    """Adds the new node to the cluster.
1458

1459
    """
1460
    new_node = self.new_node
1461
    node = new_node.name
1462

    
1463
    # set up inter-node password and certificate and restarts the node daemon
1464
    gntpass = self.sstore.GetNodeDaemonPassword()
1465
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1466
      raise errors.OpExecError("ganeti password corruption detected")
1467
    f = open(constants.SSL_CERT_FILE)
1468
    try:
1469
      gntpem = f.read(8192)
1470
    finally:
1471
      f.close()
1472
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1473
    # so we use this to detect an invalid certificate; as long as the
1474
    # cert doesn't contain this, the here-document will be correctly
1475
    # parsed by the shell sequence below
1476
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1477
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1478
    if not gntpem.endswith("\n"):
1479
      raise errors.OpExecError("PEM must end with newline")
1480
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1481

    
1482
    # and then connect with ssh to set password and start ganeti-noded
1483
    # note that all the below variables are sanitized at this point,
1484
    # either by being constants or by the checks above
1485
    ss = self.sstore
1486
    mycommand = ("umask 077 && "
1487
                 "echo '%s' > '%s' && "
1488
                 "cat > '%s' << '!EOF.' && \n"
1489
                 "%s!EOF.\n%s restart" %
1490
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1491
                  constants.SSL_CERT_FILE, gntpem,
1492
                  constants.NODE_INITD_SCRIPT))
1493

    
1494
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1495
    if result.failed:
1496
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1497
                               " output: %s" %
1498
                               (node, result.fail_reason, result.output))
1499

    
1500
    # check connectivity
1501
    time.sleep(4)
1502

    
1503
    result = rpc.call_version([node])[node]
1504
    if result:
1505
      if constants.PROTOCOL_VERSION == result:
1506
        logger.Info("communication to node %s fine, sw version %s match" %
1507
                    (node, result))
1508
      else:
1509
        raise errors.OpExecError("Version mismatch master version %s,"
1510
                                 " node version %s" %
1511
                                 (constants.PROTOCOL_VERSION, result))
1512
    else:
1513
      raise errors.OpExecError("Cannot get version from the new node")
1514

    
1515
    # setup ssh on node
1516
    logger.Info("copy ssh key to node %s" % node)
1517
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1518
    keyarray = []
1519
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1520
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1521
                priv_key, pub_key]
1522

    
1523
    for i in keyfiles:
1524
      f = open(i, 'r')
1525
      try:
1526
        keyarray.append(f.read())
1527
      finally:
1528
        f.close()
1529

    
1530
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1531
                               keyarray[3], keyarray[4], keyarray[5])
1532

    
1533
    if not result:
1534
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1535

    
1536
    # Add node to our /etc/hosts, and add key to known_hosts
1537
    _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1538
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1539
                      self.cfg.GetHostKey())
1540

    
1541
    if new_node.secondary_ip != new_node.primary_ip:
1542
      if not rpc.call_node_tcp_ping(new_node.name,
1543
                                    constants.LOCALHOST_IP_ADDRESS,
1544
                                    new_node.secondary_ip,
1545
                                    constants.DEFAULT_NODED_PORT,
1546
                                    10, False):
1547
        raise errors.OpExecError("Node claims it doesn't have the"
1548
                                 " secondary ip you gave (%s).\n"
1549
                                 "Please fix and re-run this command." %
1550
                                 new_node.secondary_ip)
1551

    
1552
    success, msg = ssh.VerifyNodeHostname(node)
1553
    if not success:
1554
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1555
                               " than the one the resolver gives: %s.\n"
1556
                               "Please fix and re-run this command." %
1557
                               (node, msg))
1558

    
1559
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1560
    # including the node just added
1561
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1562
    dist_nodes = self.cfg.GetNodeList() + [node]
1563
    if myself.name in dist_nodes:
1564
      dist_nodes.remove(myself.name)
1565

    
1566
    logger.Debug("Copying hosts and known_hosts to all nodes")
1567
    for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1568
      result = rpc.call_upload_file(dist_nodes, fname)
1569
      for to_node in dist_nodes:
1570
        if not result[to_node]:
1571
          logger.Error("copy of file %s to node %s failed" %
1572
                       (fname, to_node))
1573

    
1574
    to_copy = ss.GetFileList()
1575
    for fname in to_copy:
1576
      if not ssh.CopyFileToNode(node, fname):
1577
        logger.Error("could not copy file %s to node %s" % (fname, node))
1578

    
1579
    logger.Info("adding node %s to cluster.conf" % node)
1580
    self.cfg.AddNode(new_node)
1581

    
1582

    
1583
class LUMasterFailover(LogicalUnit):
1584
  """Failover the master node to the current node.
1585

1586
  This is a special LU in that it must run on a non-master node.
1587

1588
  """
1589
  HPATH = "master-failover"
1590
  HTYPE = constants.HTYPE_CLUSTER
1591
  REQ_MASTER = False
1592
  _OP_REQP = []
1593

    
1594
  def BuildHooksEnv(self):
1595
    """Build hooks env.
1596

1597
    This will run on the new master only in the pre phase, and on all
1598
    the nodes in the post phase.
1599

1600
    """
1601
    env = {
1602
      "OP_TARGET": self.new_master,
1603
      "NEW_MASTER": self.new_master,
1604
      "OLD_MASTER": self.old_master,
1605
      }
1606
    return env, [self.new_master], self.cfg.GetNodeList()
1607

    
1608
  def CheckPrereq(self):
1609
    """Check prerequisites.
1610

1611
    This checks that we are not already the master.
1612

1613
    """
1614
    self.new_master = utils.HostInfo().name
1615
    self.old_master = self.sstore.GetMasterNode()
1616

    
1617
    if self.old_master == self.new_master:
1618
      raise errors.OpPrereqError("This commands must be run on the node"
1619
                                 " where you want the new master to be.\n"
1620
                                 "%s is already the master" %
1621
                                 self.old_master)
1622

    
1623
  def Exec(self, feedback_fn):
1624
    """Failover the master node.
1625

1626
    This command, when run on a non-master node, will cause the current
1627
    master to cease being master, and the non-master to become new
1628
    master.
1629

1630
    """
1631
    #TODO: do not rely on gethostname returning the FQDN
1632
    logger.Info("setting master to %s, old master: %s" %
1633
                (self.new_master, self.old_master))
1634

    
1635
    if not rpc.call_node_stop_master(self.old_master):
1636
      logger.Error("could disable the master role on the old master"
1637
                   " %s, please disable manually" % self.old_master)
1638

    
1639
    ss = self.sstore
1640
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1641
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1642
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1643
      logger.Error("could not distribute the new simple store master file"
1644
                   " to the other nodes, please check.")
1645

    
1646
    if not rpc.call_node_start_master(self.new_master):
1647
      logger.Error("could not start the master role on the new master"
1648
                   " %s, please check" % self.new_master)
1649
      feedback_fn("Error in activating the master IP on the new master,\n"
1650
                  "please fix manually.")
1651

    
1652

    
1653

    
1654
class LUQueryClusterInfo(NoHooksLU):
1655
  """Query cluster configuration.
1656

1657
  """
1658
  _OP_REQP = []
1659
  REQ_MASTER = False
1660

    
1661
  def CheckPrereq(self):
1662
    """No prerequsites needed for this LU.
1663

1664
    """
1665
    pass
1666

    
1667
  def Exec(self, feedback_fn):
1668
    """Return cluster config.
1669

1670
    """
1671
    result = {
1672
      "name": self.sstore.GetClusterName(),
1673
      "software_version": constants.RELEASE_VERSION,
1674
      "protocol_version": constants.PROTOCOL_VERSION,
1675
      "config_version": constants.CONFIG_VERSION,
1676
      "os_api_version": constants.OS_API_VERSION,
1677
      "export_version": constants.EXPORT_VERSION,
1678
      "master": self.sstore.GetMasterNode(),
1679
      "architecture": (platform.architecture()[0], platform.machine()),
1680
      }
1681

    
1682
    return result
1683

    
1684

    
1685
class LUClusterCopyFile(NoHooksLU):
1686
  """Copy file to cluster.
1687

1688
  """
1689
  _OP_REQP = ["nodes", "filename"]
1690

    
1691
  def CheckPrereq(self):
1692
    """Check prerequisites.
1693

1694
    It should check that the named file exists and that the given list
1695
    of nodes is valid.
1696

1697
    """
1698
    if not os.path.exists(self.op.filename):
1699
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1700

    
1701
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1702

    
1703
  def Exec(self, feedback_fn):
1704
    """Copy a file from master to some nodes.
1705

1706
    Args:
1707
      opts - class with options as members
1708
      args - list containing a single element, the file name
1709
    Opts used:
1710
      nodes - list containing the name of target nodes; if empty, all nodes
1711

1712
    """
1713
    filename = self.op.filename
1714

    
1715
    myname = utils.HostInfo().name
1716

    
1717
    for node in self.nodes:
1718
      if node == myname:
1719
        continue
1720
      if not ssh.CopyFileToNode(node, filename):
1721
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1722

    
1723

    
1724
class LUDumpClusterConfig(NoHooksLU):
1725
  """Return a text-representation of the cluster-config.
1726

1727
  """
1728
  _OP_REQP = []
1729

    
1730
  def CheckPrereq(self):
1731
    """No prerequisites.
1732

1733
    """
1734
    pass
1735

    
1736
  def Exec(self, feedback_fn):
1737
    """Dump a representation of the cluster config to the standard output.
1738

1739
    """
1740
    return self.cfg.DumpConfig()
1741

    
1742

    
1743
class LURunClusterCommand(NoHooksLU):
1744
  """Run a command on some nodes.
1745

1746
  """
1747
  _OP_REQP = ["command", "nodes"]
1748

    
1749
  def CheckPrereq(self):
1750
    """Check prerequisites.
1751

1752
    It checks that the given list of nodes is valid.
1753

1754
    """
1755
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1756

    
1757
  def Exec(self, feedback_fn):
1758
    """Run a command on some nodes.
1759

1760
    """
1761
    data = []
1762
    for node in self.nodes:
1763
      result = ssh.SSHCall(node, "root", self.op.command)
1764
      data.append((node, result.output, result.exit_code))
1765

    
1766
    return data
1767

    
1768

    
1769
class LUActivateInstanceDisks(NoHooksLU):
1770
  """Bring up an instance's disks.
1771

1772
  """
1773
  _OP_REQP = ["instance_name"]
1774

    
1775
  def CheckPrereq(self):
1776
    """Check prerequisites.
1777

1778
    This checks that the instance is in the cluster.
1779

1780
    """
1781
    instance = self.cfg.GetInstanceInfo(
1782
      self.cfg.ExpandInstanceName(self.op.instance_name))
1783
    if instance is None:
1784
      raise errors.OpPrereqError("Instance '%s' not known" %
1785
                                 self.op.instance_name)
1786
    self.instance = instance
1787

    
1788

    
1789
  def Exec(self, feedback_fn):
1790
    """Activate the disks.
1791

1792
    """
1793
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1794
    if not disks_ok:
1795
      raise errors.OpExecError("Cannot activate block devices")
1796

    
1797
    return disks_info
1798

    
1799

    
1800
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1801
  """Prepare the block devices for an instance.
1802

1803
  This sets up the block devices on all nodes.
1804

1805
  Args:
1806
    instance: a ganeti.objects.Instance object
1807
    ignore_secondaries: if true, errors on secondary nodes won't result
1808
                        in an error return from the function
1809

1810
  Returns:
1811
    false if the operation failed
1812
    list of (host, instance_visible_name, node_visible_name) if the operation
1813
         suceeded with the mapping from node devices to instance devices
1814
  """
1815
  device_info = []
1816
  disks_ok = True
1817
  for inst_disk in instance.disks:
1818
    master_result = None
1819
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1820
      cfg.SetDiskID(node_disk, node)
1821
      is_primary = node == instance.primary_node
1822
      result = rpc.call_blockdev_assemble(node, node_disk,
1823
                                          instance.name, is_primary)
1824
      if not result:
1825
        logger.Error("could not prepare block device %s on node %s (is_pri"
1826
                     "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1827
        if is_primary or not ignore_secondaries:
1828
          disks_ok = False
1829
      if is_primary:
1830
        master_result = result
1831
    device_info.append((instance.primary_node, inst_disk.iv_name,
1832
                        master_result))
1833

    
1834
  # leave the disks configured for the primary node
1835
  # this is a workaround that would be fixed better by
1836
  # improving the logical/physical id handling
1837
  for disk in instance.disks:
1838
    cfg.SetDiskID(disk, instance.primary_node)
1839

    
1840
  return disks_ok, device_info
1841

    
1842

    
1843
def _StartInstanceDisks(cfg, instance, force):
1844
  """Start the disks of an instance.
1845

1846
  """
1847
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1848
                                           ignore_secondaries=force)
1849
  if not disks_ok:
1850
    _ShutdownInstanceDisks(instance, cfg)
1851
    if force is not None and not force:
1852
      logger.Error("If the message above refers to a secondary node,"
1853
                   " you can retry the operation using '--force'.")
1854
    raise errors.OpExecError("Disk consistency error")
1855

    
1856

    
1857
class LUDeactivateInstanceDisks(NoHooksLU):
1858
  """Shutdown an instance's disks.
1859

1860
  """
1861
  _OP_REQP = ["instance_name"]
1862

    
1863
  def CheckPrereq(self):
1864
    """Check prerequisites.
1865

1866
    This checks that the instance is in the cluster.
1867

1868
    """
1869
    instance = self.cfg.GetInstanceInfo(
1870
      self.cfg.ExpandInstanceName(self.op.instance_name))
1871
    if instance is None:
1872
      raise errors.OpPrereqError("Instance '%s' not known" %
1873
                                 self.op.instance_name)
1874
    self.instance = instance
1875

    
1876
  def Exec(self, feedback_fn):
1877
    """Deactivate the disks
1878

1879
    """
1880
    instance = self.instance
1881
    ins_l = rpc.call_instance_list([instance.primary_node])
1882
    ins_l = ins_l[instance.primary_node]
1883
    if not type(ins_l) is list:
1884
      raise errors.OpExecError("Can't contact node '%s'" %
1885
                               instance.primary_node)
1886

    
1887
    if self.instance.name in ins_l:
1888
      raise errors.OpExecError("Instance is running, can't shutdown"
1889
                               " block devices.")
1890

    
1891
    _ShutdownInstanceDisks(instance, self.cfg)
1892

    
1893

    
1894
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1895
  """Shutdown block devices of an instance.
1896

1897
  This does the shutdown on all nodes of the instance.
1898

1899
  If the ignore_primary is false, errors on the primary node are
1900
  ignored.
1901

1902
  """
1903
  result = True
1904
  for disk in instance.disks:
1905
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1906
      cfg.SetDiskID(top_disk, node)
1907
      if not rpc.call_blockdev_shutdown(node, top_disk):
1908
        logger.Error("could not shutdown block device %s on node %s" %
1909
                     (disk.iv_name, node))
1910
        if not ignore_primary or node != instance.primary_node:
1911
          result = False
1912
  return result
1913

    
1914

    
1915
class LUStartupInstance(LogicalUnit):
1916
  """Starts an instance.
1917

1918
  """
1919
  HPATH = "instance-start"
1920
  HTYPE = constants.HTYPE_INSTANCE
1921
  _OP_REQP = ["instance_name", "force"]
1922

    
1923
  def BuildHooksEnv(self):
1924
    """Build hooks env.
1925

1926
    This runs on master, primary and secondary nodes of the instance.
1927

1928
    """
1929
    env = {
1930
      "FORCE": self.op.force,
1931
      }
1932
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1933
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1934
          list(self.instance.secondary_nodes))
1935
    return env, nl, nl
1936

    
1937
  def CheckPrereq(self):
1938
    """Check prerequisites.
1939

1940
    This checks that the instance is in the cluster.
1941

1942
    """
1943
    instance = self.cfg.GetInstanceInfo(
1944
      self.cfg.ExpandInstanceName(self.op.instance_name))
1945
    if instance is None:
1946
      raise errors.OpPrereqError("Instance '%s' not known" %
1947
                                 self.op.instance_name)
1948

    
1949
    # check bridges existance
1950
    _CheckInstanceBridgesExist(instance)
1951

    
1952
    self.instance = instance
1953
    self.op.instance_name = instance.name
1954

    
1955
  def Exec(self, feedback_fn):
1956
    """Start the instance.
1957

1958
    """
1959
    instance = self.instance
1960
    force = self.op.force
1961
    extra_args = getattr(self.op, "extra_args", "")
1962

    
1963
    node_current = instance.primary_node
1964

    
1965
    nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1966
    if not nodeinfo:
1967
      raise errors.OpExecError("Could not contact node %s for infos" %
1968
                               (node_current))
1969

    
1970
    freememory = nodeinfo[node_current]['memory_free']
1971
    memory = instance.memory
1972
    if memory > freememory:
1973
      raise errors.OpExecError("Not enough memory to start instance"
1974
                               " %s on node %s"
1975
                               " needed %s MiB, available %s MiB" %
1976
                               (instance.name, node_current, memory,
1977
                                freememory))
1978

    
1979
    _StartInstanceDisks(self.cfg, instance, force)
1980

    
1981
    if not rpc.call_instance_start(node_current, instance, extra_args):
1982
      _ShutdownInstanceDisks(instance, self.cfg)
1983
      raise errors.OpExecError("Could not start instance")
1984

    
1985
    self.cfg.MarkInstanceUp(instance.name)
1986

    
1987

    
1988
class LURebootInstance(LogicalUnit):
1989
  """Reboot an instance.
1990

1991
  """
1992
  HPATH = "instance-reboot"
1993
  HTYPE = constants.HTYPE_INSTANCE
1994
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
1995

    
1996
  def BuildHooksEnv(self):
1997
    """Build hooks env.
1998

1999
    This runs on master, primary and secondary nodes of the instance.
2000

2001
    """
2002
    env = {
2003
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2004
      }
2005
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2006
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2007
          list(self.instance.secondary_nodes))
2008
    return env, nl, nl
2009

    
2010
  def CheckPrereq(self):
2011
    """Check prerequisites.
2012

2013
    This checks that the instance is in the cluster.
2014

2015
    """
2016
    instance = self.cfg.GetInstanceInfo(
2017
      self.cfg.ExpandInstanceName(self.op.instance_name))
2018
    if instance is None:
2019
      raise errors.OpPrereqError("Instance '%s' not known" %
2020
                                 self.op.instance_name)
2021

    
2022
    # check bridges existance
2023
    _CheckInstanceBridgesExist(instance)
2024

    
2025
    self.instance = instance
2026
    self.op.instance_name = instance.name
2027

    
2028
  def Exec(self, feedback_fn):
2029
    """Reboot the instance.
2030

2031
    """
2032
    instance = self.instance
2033
    ignore_secondaries = self.op.ignore_secondaries
2034
    reboot_type = self.op.reboot_type
2035
    extra_args = getattr(self.op, "extra_args", "")
2036

    
2037
    node_current = instance.primary_node
2038

    
2039
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2040
                           constants.INSTANCE_REBOOT_HARD,
2041
                           constants.INSTANCE_REBOOT_FULL]:
2042
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2043
                                  (constants.INSTANCE_REBOOT_SOFT,
2044
                                   constants.INSTANCE_REBOOT_HARD,
2045
                                   constants.INSTANCE_REBOOT_FULL))
2046

    
2047
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2048
                       constants.INSTANCE_REBOOT_HARD]:
2049
      if not rpc.call_instance_reboot(node_current, instance,
2050
                                      reboot_type, extra_args):
2051
        raise errors.OpExecError("Could not reboot instance")
2052
    else:
2053
      if not rpc.call_instance_shutdown(node_current, instance):
2054
        raise errors.OpExecError("could not shutdown instance for full reboot")
2055
      _ShutdownInstanceDisks(instance, self.cfg)
2056
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2057
      if not rpc.call_instance_start(node_current, instance, extra_args):
2058
        _ShutdownInstanceDisks(instance, self.cfg)
2059
        raise errors.OpExecError("Could not start instance for full reboot")
2060

    
2061
    self.cfg.MarkInstanceUp(instance.name)
2062

    
2063

    
2064
class LUShutdownInstance(LogicalUnit):
2065
  """Shutdown an instance.
2066

2067
  """
2068
  HPATH = "instance-stop"
2069
  HTYPE = constants.HTYPE_INSTANCE
2070
  _OP_REQP = ["instance_name"]
2071

    
2072
  def BuildHooksEnv(self):
2073
    """Build hooks env.
2074

2075
    This runs on master, primary and secondary nodes of the instance.
2076

2077
    """
2078
    env = _BuildInstanceHookEnvByObject(self.instance)
2079
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2080
          list(self.instance.secondary_nodes))
2081
    return env, nl, nl
2082

    
2083
  def CheckPrereq(self):
2084
    """Check prerequisites.
2085

2086
    This checks that the instance is in the cluster.
2087

2088
    """
2089
    instance = self.cfg.GetInstanceInfo(
2090
      self.cfg.ExpandInstanceName(self.op.instance_name))
2091
    if instance is None:
2092
      raise errors.OpPrereqError("Instance '%s' not known" %
2093
                                 self.op.instance_name)
2094
    self.instance = instance
2095

    
2096
  def Exec(self, feedback_fn):
2097
    """Shutdown the instance.
2098

2099
    """
2100
    instance = self.instance
2101
    node_current = instance.primary_node
2102
    if not rpc.call_instance_shutdown(node_current, instance):
2103
      logger.Error("could not shutdown instance")
2104

    
2105
    self.cfg.MarkInstanceDown(instance.name)
2106
    _ShutdownInstanceDisks(instance, self.cfg)
2107

    
2108

    
2109
class LUReinstallInstance(LogicalUnit):
2110
  """Reinstall an instance.
2111

2112
  """
2113
  HPATH = "instance-reinstall"
2114
  HTYPE = constants.HTYPE_INSTANCE
2115
  _OP_REQP = ["instance_name"]
2116

    
2117
  def BuildHooksEnv(self):
2118
    """Build hooks env.
2119

2120
    This runs on master, primary and secondary nodes of the instance.
2121

2122
    """
2123
    env = _BuildInstanceHookEnvByObject(self.instance)
2124
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2125
          list(self.instance.secondary_nodes))
2126
    return env, nl, nl
2127

    
2128
  def CheckPrereq(self):
2129
    """Check prerequisites.
2130

2131
    This checks that the instance is in the cluster and is not running.
2132

2133
    """
2134
    instance = self.cfg.GetInstanceInfo(
2135
      self.cfg.ExpandInstanceName(self.op.instance_name))
2136
    if instance is None:
2137
      raise errors.OpPrereqError("Instance '%s' not known" %
2138
                                 self.op.instance_name)
2139
    if instance.disk_template == constants.DT_DISKLESS:
2140
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2141
                                 self.op.instance_name)
2142
    if instance.status != "down":
2143
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2144
                                 self.op.instance_name)
2145
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2146
    if remote_info:
2147
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2148
                                 (self.op.instance_name,
2149
                                  instance.primary_node))
2150

    
2151
    self.op.os_type = getattr(self.op, "os_type", None)
2152
    if self.op.os_type is not None:
2153
      # OS verification
2154
      pnode = self.cfg.GetNodeInfo(
2155
        self.cfg.ExpandNodeName(instance.primary_node))
2156
      if pnode is None:
2157
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2158
                                   self.op.pnode)
2159
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2160
      if not os_obj:
2161
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2162
                                   " primary node"  % self.op.os_type)
2163

    
2164
    self.instance = instance
2165

    
2166
  def Exec(self, feedback_fn):
2167
    """Reinstall the instance.
2168

2169
    """
2170
    inst = self.instance
2171

    
2172
    if self.op.os_type is not None:
2173
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2174
      inst.os = self.op.os_type
2175
      self.cfg.AddInstance(inst)
2176

    
2177
    _StartInstanceDisks(self.cfg, inst, None)
2178
    try:
2179
      feedback_fn("Running the instance OS create scripts...")
2180
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2181
        raise errors.OpExecError("Could not install OS for instance %s "
2182
                                 "on node %s" %
2183
                                 (inst.name, inst.primary_node))
2184
    finally:
2185
      _ShutdownInstanceDisks(inst, self.cfg)
2186

    
2187

    
2188
class LURenameInstance(LogicalUnit):
2189
  """Rename an instance.
2190

2191
  """
2192
  HPATH = "instance-rename"
2193
  HTYPE = constants.HTYPE_INSTANCE
2194
  _OP_REQP = ["instance_name", "new_name"]
2195

    
2196
  def BuildHooksEnv(self):
2197
    """Build hooks env.
2198

2199
    This runs on master, primary and secondary nodes of the instance.
2200

2201
    """
2202
    env = _BuildInstanceHookEnvByObject(self.instance)
2203
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2204
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2205
          list(self.instance.secondary_nodes))
2206
    return env, nl, nl
2207

    
2208
  def CheckPrereq(self):
2209
    """Check prerequisites.
2210

2211
    This checks that the instance is in the cluster and is not running.
2212

2213
    """
2214
    instance = self.cfg.GetInstanceInfo(
2215
      self.cfg.ExpandInstanceName(self.op.instance_name))
2216
    if instance is None:
2217
      raise errors.OpPrereqError("Instance '%s' not known" %
2218
                                 self.op.instance_name)
2219
    if instance.status != "down":
2220
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2221
                                 self.op.instance_name)
2222
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2223
    if remote_info:
2224
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2225
                                 (self.op.instance_name,
2226
                                  instance.primary_node))
2227
    self.instance = instance
2228

    
2229
    # new name verification
2230
    name_info = utils.HostInfo(self.op.new_name)
2231

    
2232
    self.op.new_name = new_name = name_info.name
2233
    if not getattr(self.op, "ignore_ip", False):
2234
      command = ["fping", "-q", name_info.ip]
2235
      result = utils.RunCmd(command)
2236
      if not result.failed:
2237
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2238
                                   (name_info.ip, new_name))
2239

    
2240

    
2241
  def Exec(self, feedback_fn):
2242
    """Reinstall the instance.
2243

2244
    """
2245
    inst = self.instance
2246
    old_name = inst.name
2247

    
2248
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2249

    
2250
    # re-read the instance from the configuration after rename
2251
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2252

    
2253
    _StartInstanceDisks(self.cfg, inst, None)
2254
    try:
2255
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2256
                                          "sda", "sdb"):
2257
        msg = ("Could run OS rename script for instance %s\n"
2258
               "on node %s\n"
2259
               "(but the instance has been renamed in Ganeti)" %
2260
               (inst.name, inst.primary_node))
2261
        logger.Error(msg)
2262
    finally:
2263
      _ShutdownInstanceDisks(inst, self.cfg)
2264

    
2265

    
2266
class LURemoveInstance(LogicalUnit):
2267
  """Remove an instance.
2268

2269
  """
2270
  HPATH = "instance-remove"
2271
  HTYPE = constants.HTYPE_INSTANCE
2272
  _OP_REQP = ["instance_name"]
2273

    
2274
  def BuildHooksEnv(self):
2275
    """Build hooks env.
2276

2277
    This runs on master, primary and secondary nodes of the instance.
2278

2279
    """
2280
    env = _BuildInstanceHookEnvByObject(self.instance)
2281
    nl = [self.sstore.GetMasterNode()]
2282
    return env, nl, nl
2283

    
2284
  def CheckPrereq(self):
2285
    """Check prerequisites.
2286

2287
    This checks that the instance is in the cluster.
2288

2289
    """
2290
    instance = self.cfg.GetInstanceInfo(
2291
      self.cfg.ExpandInstanceName(self.op.instance_name))
2292
    if instance is None:
2293
      raise errors.OpPrereqError("Instance '%s' not known" %
2294
                                 self.op.instance_name)
2295
    self.instance = instance
2296

    
2297
  def Exec(self, feedback_fn):
2298
    """Remove the instance.
2299

2300
    """
2301
    instance = self.instance
2302
    logger.Info("shutting down instance %s on node %s" %
2303
                (instance.name, instance.primary_node))
2304

    
2305
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2306
      if self.op.ignore_failures:
2307
        feedback_fn("Warning: can't shutdown instance")
2308
      else:
2309
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2310
                                 (instance.name, instance.primary_node))
2311

    
2312
    logger.Info("removing block devices for instance %s" % instance.name)
2313

    
2314
    if not _RemoveDisks(instance, self.cfg):
2315
      if self.op.ignore_failures:
2316
        feedback_fn("Warning: can't remove instance's disks")
2317
      else:
2318
        raise errors.OpExecError("Can't remove instance's disks")
2319

    
2320
    logger.Info("removing instance %s out of cluster config" % instance.name)
2321

    
2322
    self.cfg.RemoveInstance(instance.name)
2323

    
2324

    
2325
class LUQueryInstances(NoHooksLU):
2326
  """Logical unit for querying instances.
2327

2328
  """
2329
  _OP_REQP = ["output_fields", "names"]
2330

    
2331
  def CheckPrereq(self):
2332
    """Check prerequisites.
2333

2334
    This checks that the fields required are valid output fields.
2335

2336
    """
2337
    self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2338
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2339
                               "admin_state", "admin_ram",
2340
                               "disk_template", "ip", "mac", "bridge",
2341
                               "sda_size", "sdb_size"],
2342
                       dynamic=self.dynamic_fields,
2343
                       selected=self.op.output_fields)
2344

    
2345
    self.wanted = _GetWantedInstances(self, self.op.names)
2346

    
2347
  def Exec(self, feedback_fn):
2348
    """Computes the list of nodes and their attributes.
2349

2350
    """
2351
    instance_names = self.wanted
2352
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2353
                     in instance_names]
2354

    
2355
    # begin data gathering
2356

    
2357
    nodes = frozenset([inst.primary_node for inst in instance_list])
2358

    
2359
    bad_nodes = []
2360
    if self.dynamic_fields.intersection(self.op.output_fields):
2361
      live_data = {}
2362
      node_data = rpc.call_all_instances_info(nodes)
2363
      for name in nodes:
2364
        result = node_data[name]
2365
        if result:
2366
          live_data.update(result)
2367
        elif result == False:
2368
          bad_nodes.append(name)
2369
        # else no instance is alive
2370
    else:
2371
      live_data = dict([(name, {}) for name in instance_names])
2372

    
2373
    # end data gathering
2374

    
2375
    output = []
2376
    for instance in instance_list:
2377
      iout = []
2378
      for field in self.op.output_fields:
2379
        if field == "name":
2380
          val = instance.name
2381
        elif field == "os":
2382
          val = instance.os
2383
        elif field == "pnode":
2384
          val = instance.primary_node
2385
        elif field == "snodes":
2386
          val = list(instance.secondary_nodes)
2387
        elif field == "admin_state":
2388
          val = (instance.status != "down")
2389
        elif field == "oper_state":
2390
          if instance.primary_node in bad_nodes:
2391
            val = None
2392
          else:
2393
            val = bool(live_data.get(instance.name))
2394
        elif field == "admin_ram":
2395
          val = instance.memory
2396
        elif field == "oper_ram":
2397
          if instance.primary_node in bad_nodes:
2398
            val = None
2399
          elif instance.name in live_data:
2400
            val = live_data[instance.name].get("memory", "?")
2401
          else:
2402
            val = "-"
2403
        elif field == "disk_template":
2404
          val = instance.disk_template
2405
        elif field == "ip":
2406
          val = instance.nics[0].ip
2407
        elif field == "bridge":
2408
          val = instance.nics[0].bridge
2409
        elif field == "mac":
2410
          val = instance.nics[0].mac
2411
        elif field == "sda_size" or field == "sdb_size":
2412
          disk = instance.FindDisk(field[:3])
2413
          if disk is None:
2414
            val = None
2415
          else:
2416
            val = disk.size
2417
        else:
2418
          raise errors.ParameterError(field)
2419
        iout.append(val)
2420
      output.append(iout)
2421

    
2422
    return output
2423

    
2424

    
2425
class LUFailoverInstance(LogicalUnit):
2426
  """Failover an instance.
2427

2428
  """
2429
  HPATH = "instance-failover"
2430
  HTYPE = constants.HTYPE_INSTANCE
2431
  _OP_REQP = ["instance_name", "ignore_consistency"]
2432

    
2433
  def BuildHooksEnv(self):
2434
    """Build hooks env.
2435

2436
    This runs on master, primary and secondary nodes of the instance.
2437

2438
    """
2439
    env = {
2440
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2441
      }
2442
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2443
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2444
    return env, nl, nl
2445

    
2446
  def CheckPrereq(self):
2447
    """Check prerequisites.
2448

2449
    This checks that the instance is in the cluster.
2450

2451
    """
2452
    instance = self.cfg.GetInstanceInfo(
2453
      self.cfg.ExpandInstanceName(self.op.instance_name))
2454
    if instance is None:
2455
      raise errors.OpPrereqError("Instance '%s' not known" %
2456
                                 self.op.instance_name)
2457

    
2458
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2459
      raise errors.OpPrereqError("Instance's disk layout is not"
2460
                                 " network mirrored, cannot failover.")
2461

    
2462
    secondary_nodes = instance.secondary_nodes
2463
    if not secondary_nodes:
2464
      raise errors.ProgrammerError("no secondary node but using "
2465
                                   "DT_REMOTE_RAID1 template")
2466

    
2467
    # check memory requirements on the secondary node
2468
    target_node = secondary_nodes[0]
2469
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2470
    info = nodeinfo.get(target_node, None)
2471
    if not info:
2472
      raise errors.OpPrereqError("Cannot get current information"
2473
                                 " from node '%s'" % nodeinfo)
2474
    if instance.memory > info['memory_free']:
2475
      raise errors.OpPrereqError("Not enough memory on target node %s."
2476
                                 " %d MB available, %d MB required" %
2477
                                 (target_node, info['memory_free'],
2478
                                  instance.memory))
2479

    
2480
    # check bridge existance
2481
    brlist = [nic.bridge for nic in instance.nics]
2482
    if not rpc.call_bridges_exist(target_node, brlist):
2483
      raise errors.OpPrereqError("One or more target bridges %s does not"
2484
                                 " exist on destination node '%s'" %
2485
                                 (brlist, target_node))
2486

    
2487
    self.instance = instance
2488

    
2489
  def Exec(self, feedback_fn):
2490
    """Failover an instance.
2491

2492
    The failover is done by shutting it down on its present node and
2493
    starting it on the secondary.
2494

2495
    """
2496
    instance = self.instance
2497

    
2498
    source_node = instance.primary_node
2499
    target_node = instance.secondary_nodes[0]
2500

    
2501
    feedback_fn("* checking disk consistency between source and target")
2502
    for dev in instance.disks:
2503
      # for remote_raid1, these are md over drbd
2504
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2505
        if not self.op.ignore_consistency:
2506
          raise errors.OpExecError("Disk %s is degraded on target node,"
2507
                                   " aborting failover." % dev.iv_name)
2508

    
2509
    feedback_fn("* checking target node resource availability")
2510
    nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2511

    
2512
    if not nodeinfo:
2513
      raise errors.OpExecError("Could not contact target node %s." %
2514
                               target_node)
2515

    
2516
    free_memory = int(nodeinfo[target_node]['memory_free'])
2517
    memory = instance.memory
2518
    if memory > free_memory:
2519
      raise errors.OpExecError("Not enough memory to create instance %s on"
2520
                               " node %s. needed %s MiB, available %s MiB" %
2521
                               (instance.name, target_node, memory,
2522
                                free_memory))
2523

    
2524
    feedback_fn("* shutting down instance on source node")
2525
    logger.Info("Shutting down instance %s on node %s" %
2526
                (instance.name, source_node))
2527

    
2528
    if not rpc.call_instance_shutdown(source_node, instance):
2529
      if self.op.ignore_consistency:
2530
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2531
                     " anyway. Please make sure node %s is down"  %
2532
                     (instance.name, source_node, source_node))
2533
      else:
2534
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2535
                                 (instance.name, source_node))
2536

    
2537
    feedback_fn("* deactivating the instance's disks on source node")
2538
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2539
      raise errors.OpExecError("Can't shut down the instance's disks.")
2540

    
2541
    instance.primary_node = target_node
2542
    # distribute new instance config to the other nodes
2543
    self.cfg.AddInstance(instance)
2544

    
2545
    feedback_fn("* activating the instance's disks on target node")
2546
    logger.Info("Starting instance %s on node %s" %
2547
                (instance.name, target_node))
2548

    
2549
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2550
                                             ignore_secondaries=True)
2551
    if not disks_ok:
2552
      _ShutdownInstanceDisks(instance, self.cfg)
2553
      raise errors.OpExecError("Can't activate the instance's disks")
2554

    
2555
    feedback_fn("* starting the instance on the target node")
2556
    if not rpc.call_instance_start(target_node, instance, None):
2557
      _ShutdownInstanceDisks(instance, self.cfg)
2558
      raise errors.OpExecError("Could not start instance %s on node %s." %
2559
                               (instance.name, target_node))
2560

    
2561

    
2562
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2563
  """Create a tree of block devices on the primary node.
2564

2565
  This always creates all devices.
2566

2567
  """
2568
  if device.children:
2569
    for child in device.children:
2570
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2571
        return False
2572

    
2573
  cfg.SetDiskID(device, node)
2574
  new_id = rpc.call_blockdev_create(node, device, device.size,
2575
                                    instance.name, True, info)
2576
  if not new_id:
2577
    return False
2578
  if device.physical_id is None:
2579
    device.physical_id = new_id
2580
  return True
2581

    
2582

    
2583
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2584
  """Create a tree of block devices on a secondary node.
2585

2586
  If this device type has to be created on secondaries, create it and
2587
  all its children.
2588

2589
  If not, just recurse to children keeping the same 'force' value.
2590

2591
  """
2592
  if device.CreateOnSecondary():
2593
    force = True
2594
  if device.children:
2595
    for child in device.children:
2596
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2597
                                        child, force, info):
2598
        return False
2599

    
2600
  if not force:
2601
    return True
2602
  cfg.SetDiskID(device, node)
2603
  new_id = rpc.call_blockdev_create(node, device, device.size,
2604
                                    instance.name, False, info)
2605
  if not new_id:
2606
    return False
2607
  if device.physical_id is None:
2608
    device.physical_id = new_id
2609
  return True
2610

    
2611

    
2612
def _GenerateUniqueNames(cfg, exts):
2613
  """Generate a suitable LV name.
2614

2615
  This will generate a logical volume name for the given instance.
2616

2617
  """
2618
  results = []
2619
  for val in exts:
2620
    new_id = cfg.GenerateUniqueID()
2621
    results.append("%s%s" % (new_id, val))
2622
  return results
2623

    
2624

    
2625
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2626
  """Generate a drbd device complete with its children.
2627

2628
  """
2629
  port = cfg.AllocatePort()
2630
  vgname = cfg.GetVGName()
2631
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2632
                          logical_id=(vgname, names[0]))
2633
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2634
                          logical_id=(vgname, names[1]))
2635
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2636
                          logical_id = (primary, secondary, port),
2637
                          children = [dev_data, dev_meta])
2638
  return drbd_dev
2639

    
2640

    
2641
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2642
  """Generate a drbd8 device complete with its children.
2643

2644
  """
2645
  port = cfg.AllocatePort()
2646
  vgname = cfg.GetVGName()
2647
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2648
                          logical_id=(vgname, names[0]))
2649
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2650
                          logical_id=(vgname, names[1]))
2651
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2652
                          logical_id = (primary, secondary, port),
2653
                          children = [dev_data, dev_meta],
2654
                          iv_name=iv_name)
2655
  return drbd_dev
2656

    
2657
def _GenerateDiskTemplate(cfg, template_name,
2658
                          instance_name, primary_node,
2659
                          secondary_nodes, disk_sz, swap_sz):
2660
  """Generate the entire disk layout for a given template type.
2661

2662
  """
2663
  #TODO: compute space requirements
2664

    
2665
  vgname = cfg.GetVGName()
2666
  if template_name == "diskless":
2667
    disks = []
2668
  elif template_name == "plain":
2669
    if len(secondary_nodes) != 0:
2670
      raise errors.ProgrammerError("Wrong template configuration")
2671

    
2672
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2673
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2674
                           logical_id=(vgname, names[0]),
2675
                           iv_name = "sda")
2676
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2677
                           logical_id=(vgname, names[1]),
2678
                           iv_name = "sdb")
2679
    disks = [sda_dev, sdb_dev]
2680
  elif template_name == "local_raid1":
2681
    if len(secondary_nodes) != 0:
2682
      raise errors.ProgrammerError("Wrong template configuration")
2683

    
2684

    
2685
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2686
                                       ".sdb_m1", ".sdb_m2"])
2687
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2688
                              logical_id=(vgname, names[0]))
2689
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2690
                              logical_id=(vgname, names[1]))
2691
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2692
                              size=disk_sz,
2693
                              children = [sda_dev_m1, sda_dev_m2])
2694
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2695
                              logical_id=(vgname, names[2]))
2696
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2697
                              logical_id=(vgname, names[3]))
2698
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2699
                              size=swap_sz,
2700
                              children = [sdb_dev_m1, sdb_dev_m2])
2701
    disks = [md_sda_dev, md_sdb_dev]
2702
  elif template_name == constants.DT_REMOTE_RAID1:
2703
    if len(secondary_nodes) != 1:
2704
      raise errors.ProgrammerError("Wrong template configuration")
2705
    remote_node = secondary_nodes[0]
2706
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2707
                                       ".sdb_data", ".sdb_meta"])
2708
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2709
                                         disk_sz, names[0:2])
2710
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2711
                              children = [drbd_sda_dev], size=disk_sz)
2712
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2713
                                         swap_sz, names[2:4])
2714
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2715
                              children = [drbd_sdb_dev], size=swap_sz)
2716
    disks = [md_sda_dev, md_sdb_dev]
2717
  elif template_name == constants.DT_DRBD8:
2718
    if len(secondary_nodes) != 1:
2719
      raise errors.ProgrammerError("Wrong template configuration")
2720
    remote_node = secondary_nodes[0]
2721
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2722
                                       ".sdb_data", ".sdb_meta"])
2723
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2724
                                         disk_sz, names[0:2], "sda")
2725
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2726
                                         swap_sz, names[2:4], "sdb")
2727
    disks = [drbd_sda_dev, drbd_sdb_dev]
2728
  else:
2729
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2730
  return disks
2731

    
2732

    
2733
def _GetInstanceInfoText(instance):
2734
  """Compute that text that should be added to the disk's metadata.
2735

2736
  """
2737
  return "originstname+%s" % instance.name
2738

    
2739

    
2740
def _CreateDisks(cfg, instance):
2741
  """Create all disks for an instance.
2742

2743
  This abstracts away some work from AddInstance.
2744

2745
  Args:
2746
    instance: the instance object
2747

2748
  Returns:
2749
    True or False showing the success of the creation process
2750

2751
  """
2752
  info = _GetInstanceInfoText(instance)
2753

    
2754
  for device in instance.disks:
2755
    logger.Info("creating volume %s for instance %s" %
2756
              (device.iv_name, instance.name))
2757
    #HARDCODE
2758
    for secondary_node in instance.secondary_nodes:
2759
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2760
                                        device, False, info):
2761
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2762
                     (device.iv_name, device, secondary_node))
2763
        return False
2764
    #HARDCODE
2765
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2766
                                    instance, device, info):
2767
      logger.Error("failed to create volume %s on primary!" %
2768
                   device.iv_name)
2769
      return False
2770
  return True
2771

    
2772

    
2773
def _RemoveDisks(instance, cfg):
2774
  """Remove all disks for an instance.
2775

2776
  This abstracts away some work from `AddInstance()` and
2777
  `RemoveInstance()`. Note that in case some of the devices couldn't
2778
  be removed, the removal will continue with the other ones (compare
2779
  with `_CreateDisks()`).
2780

2781
  Args:
2782
    instance: the instance object
2783

2784
  Returns:
2785
    True or False showing the success of the removal proces
2786

2787
  """
2788
  logger.Info("removing block devices for instance %s" % instance.name)
2789

    
2790
  result = True
2791
  for device in instance.disks:
2792
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2793
      cfg.SetDiskID(disk, node)
2794
      if not rpc.call_blockdev_remove(node, disk):
2795
        logger.Error("could not remove block device %s on node %s,"
2796
                     " continuing anyway" %
2797
                     (device.iv_name, node))
2798
        result = False
2799
  return result
2800

    
2801

    
2802
class LUCreateInstance(LogicalUnit):
2803
  """Create an instance.
2804

2805
  """
2806
  HPATH = "instance-add"
2807
  HTYPE = constants.HTYPE_INSTANCE
2808
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2809
              "disk_template", "swap_size", "mode", "start", "vcpus",
2810
              "wait_for_sync", "ip_check"]
2811

    
2812
  def BuildHooksEnv(self):
2813
    """Build hooks env.
2814

2815
    This runs on master, primary and secondary nodes of the instance.
2816

2817
    """
2818
    env = {
2819
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2820
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2821
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2822
      "INSTANCE_ADD_MODE": self.op.mode,
2823
      }
2824
    if self.op.mode == constants.INSTANCE_IMPORT:
2825
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2826
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2827
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2828

    
2829
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2830
      primary_node=self.op.pnode,
2831
      secondary_nodes=self.secondaries,
2832
      status=self.instance_status,
2833
      os_type=self.op.os_type,
2834
      memory=self.op.mem_size,
2835
      vcpus=self.op.vcpus,
2836
      nics=[(self.inst_ip, self.op.bridge)],
2837
    ))
2838

    
2839
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2840
          self.secondaries)
2841
    return env, nl, nl
2842

    
2843

    
2844
  def CheckPrereq(self):
2845
    """Check prerequisites.
2846

2847
    """
2848
    if self.op.mode not in (constants.INSTANCE_CREATE,
2849
                            constants.INSTANCE_IMPORT):
2850
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2851
                                 self.op.mode)
2852

    
2853
    if self.op.mode == constants.INSTANCE_IMPORT:
2854
      src_node = getattr(self.op, "src_node", None)
2855
      src_path = getattr(self.op, "src_path", None)
2856
      if src_node is None or src_path is None:
2857
        raise errors.OpPrereqError("Importing an instance requires source"
2858
                                   " node and path options")
2859
      src_node_full = self.cfg.ExpandNodeName(src_node)
2860
      if src_node_full is None:
2861
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2862
      self.op.src_node = src_node = src_node_full
2863

    
2864
      if not os.path.isabs(src_path):
2865
        raise errors.OpPrereqError("The source path must be absolute")
2866

    
2867
      export_info = rpc.call_export_info(src_node, src_path)
2868

    
2869
      if not export_info:
2870
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2871

    
2872
      if not export_info.has_section(constants.INISECT_EXP):
2873
        raise errors.ProgrammerError("Corrupted export config")
2874

    
2875
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2876
      if (int(ei_version) != constants.EXPORT_VERSION):
2877
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2878
                                   (ei_version, constants.EXPORT_VERSION))
2879

    
2880
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2881
        raise errors.OpPrereqError("Can't import instance with more than"
2882
                                   " one data disk")
2883

    
2884
      # FIXME: are the old os-es, disk sizes, etc. useful?
2885
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2886
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2887
                                                         'disk0_dump'))
2888
      self.src_image = diskimage
2889
    else: # INSTANCE_CREATE
2890
      if getattr(self.op, "os_type", None) is None:
2891
        raise errors.OpPrereqError("No guest OS specified")
2892

    
2893
    # check primary node
2894
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2895
    if pnode is None:
2896
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2897
                                 self.op.pnode)
2898
    self.op.pnode = pnode.name
2899
    self.pnode = pnode
2900
    self.secondaries = []
2901
    # disk template and mirror node verification
2902
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2903
      raise errors.OpPrereqError("Invalid disk template name")
2904

    
2905
    if self.op.disk_template in constants.DTS_NET_MIRROR:
2906
      if getattr(self.op, "snode", None) is None:
2907
        raise errors.OpPrereqError("The networked disk templates need"
2908
                                   " a mirror node")
2909

    
2910
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2911
      if snode_name is None:
2912
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
2913
                                   self.op.snode)
2914
      elif snode_name == pnode.name:
2915
        raise errors.OpPrereqError("The secondary node cannot be"
2916
                                   " the primary node.")
2917
      self.secondaries.append(snode_name)
2918

    
2919
    # Check lv size requirements
2920
    nodenames = [pnode.name] + self.secondaries
2921
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2922

    
2923
    # Required free disk space as a function of disk and swap space
2924
    req_size_dict = {
2925
      constants.DT_DISKLESS: 0,
2926
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2927
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2928
      # 256 MB are added for drbd metadata, 128MB for each drbd device
2929
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2930
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2931
    }
2932

    
2933
    if self.op.disk_template not in req_size_dict:
2934
      raise errors.ProgrammerError("Disk template '%s' size requirement"
2935
                                   " is unknown" %  self.op.disk_template)
2936

    
2937
    req_size = req_size_dict[self.op.disk_template]
2938

    
2939
    for node in nodenames:
2940
      info = nodeinfo.get(node, None)
2941
      if not info:
2942
        raise errors.OpPrereqError("Cannot get current information"
2943
                                   " from node '%s'" % nodeinfo)
2944
      if req_size > info['vg_free']:
2945
        raise errors.OpPrereqError("Not enough disk space on target node %s."
2946
                                   " %d MB available, %d MB required" %
2947
                                   (node, info['vg_free'], req_size))
2948

    
2949
    # os verification
2950
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2951
    if not os_obj:
2952
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
2953
                                 " primary node"  % self.op.os_type)
2954

    
2955
    # instance verification
2956
    hostname1 = utils.HostInfo(self.op.instance_name)
2957

    
2958
    self.op.instance_name = instance_name = hostname1.name
2959
    instance_list = self.cfg.GetInstanceList()
2960
    if instance_name in instance_list:
2961
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2962
                                 instance_name)
2963

    
2964
    ip = getattr(self.op, "ip", None)
2965
    if ip is None or ip.lower() == "none":
2966
      inst_ip = None
2967
    elif ip.lower() == "auto":
2968
      inst_ip = hostname1.ip
2969
    else:
2970
      if not utils.IsValidIP(ip):
2971
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
2972
                                   " like a valid IP" % ip)
2973
      inst_ip = ip
2974
    self.inst_ip = inst_ip
2975

    
2976
    if self.op.start and not self.op.ip_check:
2977
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2978
                                 " adding an instance in start mode")
2979

    
2980
    if self.op.ip_check:
2981
      if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2982
                       constants.DEFAULT_NODED_PORT):
2983
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2984
                                   (hostname1.ip, instance_name))
2985

    
2986
    # bridge verification
2987
    bridge = getattr(self.op, "bridge", None)
2988
    if bridge is None:
2989
      self.op.bridge = self.cfg.GetDefBridge()
2990
    else:
2991
      self.op.bridge = bridge
2992

    
2993
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2994
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
2995
                                 " destination node '%s'" %
2996
                                 (self.op.bridge, pnode.name))
2997

    
2998
    if self.op.start:
2999
      self.instance_status = 'up'
3000
    else:
3001
      self.instance_status = 'down'
3002

    
3003
  def Exec(self, feedback_fn):
3004
    """Create and add the instance to the cluster.
3005

3006
    """
3007
    instance = self.op.instance_name
3008
    pnode_name = self.pnode.name
3009

    
3010
    nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
3011
    if self.inst_ip is not None:
3012
      nic.ip = self.inst_ip
3013

    
3014
    disks = _GenerateDiskTemplate(self.cfg,
3015
                                  self.op.disk_template,
3016
                                  instance, pnode_name,
3017
                                  self.secondaries, self.op.disk_size,
3018
                                  self.op.swap_size)
3019

    
3020
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3021
                            primary_node=pnode_name,
3022
                            memory=self.op.mem_size,
3023
                            vcpus=self.op.vcpus,
3024
                            nics=[nic], disks=disks,
3025
                            disk_template=self.op.disk_template,
3026
                            status=self.instance_status,
3027
                            )
3028

    
3029
    feedback_fn("* creating instance disks...")
3030
    if not _CreateDisks(self.cfg, iobj):
3031
      _RemoveDisks(iobj, self.cfg)
3032
      raise errors.OpExecError("Device creation failed, reverting...")
3033

    
3034
    feedback_fn("adding instance %s to cluster config" % instance)
3035

    
3036
    self.cfg.AddInstance(iobj)
3037

    
3038
    if self.op.wait_for_sync:
3039
      disk_abort = not _WaitForSync(self.cfg, iobj)
3040
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3041
      # make sure the disks are not degraded (still sync-ing is ok)
3042
      time.sleep(15)
3043
      feedback_fn("* checking mirrors status")
3044
      disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
3045
    else:
3046
      disk_abort = False
3047

    
3048
    if disk_abort:
3049
      _RemoveDisks(iobj, self.cfg)
3050
      self.cfg.RemoveInstance(iobj.name)
3051
      raise errors.OpExecError("There are some degraded disks for"
3052
                               " this instance")
3053

    
3054
    feedback_fn("creating os for instance %s on node %s" %
3055
                (instance, pnode_name))
3056

    
3057
    if iobj.disk_template != constants.DT_DISKLESS:
3058
      if self.op.mode == constants.INSTANCE_CREATE:
3059
        feedback_fn("* running the instance OS create scripts...")
3060
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3061
          raise errors.OpExecError("could not add os for instance %s"
3062
                                   " on node %s" %
3063
                                   (instance, pnode_name))
3064

    
3065
      elif self.op.mode == constants.INSTANCE_IMPORT:
3066
        feedback_fn("* running the instance OS import scripts...")
3067
        src_node = self.op.src_node
3068
        src_image = self.src_image
3069
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3070
                                                src_node, src_image):
3071
          raise errors.OpExecError("Could not import os for instance"
3072
                                   " %s on node %s" %
3073
                                   (instance, pnode_name))
3074
      else:
3075
        # also checked in the prereq part
3076
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3077
                                     % self.op.mode)
3078

    
3079
    if self.op.start:
3080
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3081
      feedback_fn("* starting instance...")
3082
      if not rpc.call_instance_start(pnode_name, iobj, None):
3083
        raise errors.OpExecError("Could not start instance")
3084

    
3085

    
3086
class LUConnectConsole(NoHooksLU):
3087
  """Connect to an instance's console.
3088

3089
  This is somewhat special in that it returns the command line that
3090
  you need to run on the master node in order to connect to the
3091
  console.
3092

3093
  """
3094
  _OP_REQP = ["instance_name"]
3095

    
3096
  def CheckPrereq(self):
3097
    """Check prerequisites.
3098

3099
    This checks that the instance is in the cluster.
3100

3101
    """
3102
    instance = self.cfg.GetInstanceInfo(
3103
      self.cfg.ExpandInstanceName(self.op.instance_name))
3104
    if instance is None:
3105
      raise errors.OpPrereqError("Instance '%s' not known" %
3106
                                 self.op.instance_name)
3107
    self.instance = instance
3108

    
3109
  def Exec(self, feedback_fn):
3110
    """Connect to the console of an instance
3111

3112
    """
3113
    instance = self.instance
3114
    node = instance.primary_node
3115

    
3116
    node_insts = rpc.call_instance_list([node])[node]
3117
    if node_insts is False:
3118
      raise errors.OpExecError("Can't connect to node %s." % node)
3119

    
3120
    if instance.name not in node_insts:
3121
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3122

    
3123
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3124

    
3125
    hyper = hypervisor.GetHypervisor()
3126
    console_cmd = hyper.GetShellCommandForConsole(instance.name)
3127
    # build ssh cmdline
3128
    argv = ["ssh", "-q", "-t"]
3129
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
3130
    argv.extend(ssh.BATCH_MODE_OPTS)
3131
    argv.append(node)
3132
    argv.append(console_cmd)
3133
    return "ssh", argv
3134

    
3135

    
3136
class LUAddMDDRBDComponent(LogicalUnit):
3137
  """Adda new mirror member to an instance's disk.
3138

3139
  """
3140
  HPATH = "mirror-add"
3141
  HTYPE = constants.HTYPE_INSTANCE
3142
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3143

    
3144
  def BuildHooksEnv(self):
3145
    """Build hooks env.
3146

3147
    This runs on the master, the primary and all the secondaries.
3148

3149
    """
3150
    env = {
3151
      "NEW_SECONDARY": self.op.remote_node,
3152
      "DISK_NAME": self.op.disk_name,
3153
      }
3154
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3155
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3156
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3157
    return env, nl, nl
3158

    
3159
  def CheckPrereq(self):
3160
    """Check prerequisites.
3161

3162
    This checks that the instance is in the cluster.
3163

3164
    """
3165
    instance = self.cfg.GetInstanceInfo(
3166
      self.cfg.ExpandInstanceName(self.op.instance_name))
3167
    if instance is None:
3168
      raise errors.OpPrereqError("Instance '%s' not known" %
3169
                                 self.op.instance_name)
3170
    self.instance = instance
3171

    
3172
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3173
    if remote_node is None:
3174
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3175
    self.remote_node = remote_node
3176

    
3177
    if remote_node == instance.primary_node:
3178
      raise errors.OpPrereqError("The specified node is the primary node of"
3179
                                 " the instance.")
3180

    
3181
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3182
      raise errors.OpPrereqError("Instance's disk layout is not"
3183
                                 " remote_raid1.")
3184
    for disk in instance.disks:
3185
      if disk.iv_name == self.op.disk_name:
3186
        break
3187
    else:
3188
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3189
                                 " instance." % self.op.disk_name)
3190
    if len(disk.children) > 1:
3191
      raise errors.OpPrereqError("The device already has two slave"
3192
                                 " devices.\n"
3193
                                 "This would create a 3-disk raid1"
3194
                                 " which we don't allow.")
3195
    self.disk = disk
3196

    
3197
  def Exec(self, feedback_fn):
3198
    """Add the mirror component
3199

3200
    """
3201
    disk = self.disk
3202
    instance = self.instance
3203

    
3204
    remote_node = self.remote_node
3205
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3206
    names = _GenerateUniqueNames(self.cfg, lv_names)
3207
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3208
                                     remote_node, disk.size, names)
3209

    
3210
    logger.Info("adding new mirror component on secondary")
3211
    #HARDCODE
3212
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3213
                                      new_drbd, False,
3214
                                      _GetInstanceInfoText(instance)):
3215
      raise errors.OpExecError("Failed to create new component on secondary"
3216
                               " node %s" % remote_node)
3217

    
3218
    logger.Info("adding new mirror component on primary")
3219
    #HARDCODE
3220
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3221
                                    instance, new_drbd,
3222
                                    _GetInstanceInfoText(instance)):
3223
      # remove secondary dev
3224
      self.cfg.SetDiskID(new_drbd, remote_node)
3225
      rpc.call_blockdev_remove(remote_node, new_drbd)
3226
      raise errors.OpExecError("Failed to create volume on primary")
3227

    
3228
    # the device exists now
3229
    # call the primary node to add the mirror to md
3230
    logger.Info("adding new mirror component to md")
3231
    if not rpc.call_blockdev_addchildren(instance.primary_node,
3232
                                         disk, [new_drbd]):
3233
      logger.Error("Can't add mirror compoment to md!")
3234
      self.cfg.SetDiskID(new_drbd, remote_node)
3235
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3236
        logger.Error("Can't rollback on secondary")
3237
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3238
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3239
        logger.Error("Can't rollback on primary")
3240
      raise errors.OpExecError("Can't add mirror component to md array")
3241

    
3242
    disk.children.append(new_drbd)
3243

    
3244
    self.cfg.AddInstance(instance)
3245

    
3246
    _WaitForSync(self.cfg, instance)
3247

    
3248
    return 0
3249

    
3250

    
3251
class LURemoveMDDRBDComponent(LogicalUnit):
3252
  """Remove a component from a remote_raid1 disk.
3253

3254
  """
3255
  HPATH = "mirror-remove"
3256
  HTYPE = constants.HTYPE_INSTANCE
3257
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3258

    
3259
  def BuildHooksEnv(self):
3260
    """Build hooks env.
3261

3262
    This runs on the master, the primary and all the secondaries.
3263

3264
    """
3265
    env = {
3266
      "DISK_NAME": self.op.disk_name,
3267
      "DISK_ID": self.op.disk_id,
3268
      "OLD_SECONDARY": self.old_secondary,
3269
      }
3270
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3271
    nl = [self.sstore.GetMasterNode(),
3272
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3273
    return env, nl, nl
3274

    
3275
  def CheckPrereq(self):
3276
    """Check prerequisites.
3277

3278
    This checks that the instance is in the cluster.
3279

3280
    """
3281
    instance = self.cfg.GetInstanceInfo(
3282
      self.cfg.ExpandInstanceName(self.op.instance_name))
3283
    if instance is None:
3284
      raise errors.OpPrereqError("Instance '%s' not known" %
3285
                                 self.op.instance_name)
3286
    self.instance = instance
3287

    
3288
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3289
      raise errors.OpPrereqError("Instance's disk layout is not"
3290
                                 " remote_raid1.")
3291
    for disk in instance.disks:
3292
      if disk.iv_name == self.op.disk_name:
3293
        break
3294
    else:
3295
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3296
                                 " instance." % self.op.disk_name)
3297
    for child in disk.children:
3298
      if (child.dev_type == constants.LD_DRBD7 and
3299
          child.logical_id[2] == self.op.disk_id):
3300
        break
3301
    else:
3302
      raise errors.OpPrereqError("Can't find the device with this port.")
3303

    
3304
    if len(disk.children) < 2:
3305
      raise errors.OpPrereqError("Cannot remove the last component from"
3306
                                 " a mirror.")
3307
    self.disk = disk
3308
    self.child = child
3309
    if self.child.logical_id[0] == instance.primary_node:
3310
      oid = 1
3311
    else:
3312
      oid = 0
3313
    self.old_secondary = self.child.logical_id[oid]
3314

    
3315
  def Exec(self, feedback_fn):
3316
    """Remove the mirror component
3317

3318
    """
3319
    instance = self.instance
3320
    disk = self.disk
3321
    child = self.child
3322
    logger.Info("remove mirror component")
3323
    self.cfg.SetDiskID(disk, instance.primary_node)
3324
    if not rpc.call_blockdev_removechildren(instance.primary_node,
3325
                                            disk, [child]):
3326
      raise errors.OpExecError("Can't remove child from mirror.")
3327

    
3328
    for node in child.logical_id[:2]:
3329
      self.cfg.SetDiskID(child, node)
3330
      if not rpc.call_blockdev_remove(node, child):
3331
        logger.Error("Warning: failed to remove device from node %s,"
3332
                     " continuing operation." % node)
3333

    
3334
    disk.children.remove(child)
3335
    self.cfg.AddInstance(instance)
3336

    
3337

    
3338
class LUReplaceDisks(LogicalUnit):
3339
  """Replace the disks of an instance.
3340

3341
  """
3342
  HPATH = "mirrors-replace"
3343
  HTYPE = constants.HTYPE_INSTANCE
3344
  _OP_REQP = ["instance_name", "mode", "disks"]
3345

    
3346
  def BuildHooksEnv(self):
3347
    """Build hooks env.
3348

3349
    This runs on the master, the primary and all the secondaries.
3350

3351
    """
3352
    env = {
3353
      "MODE": self.op.mode,
3354
      "NEW_SECONDARY": self.op.remote_node,
3355
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3356
      }
3357
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3358
    nl = [self.sstore.GetMasterNode(),
3359
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3360
    return env, nl, nl
3361

    
3362
  def CheckPrereq(self):
3363
    """Check prerequisites.
3364

3365
    This checks that the instance is in the cluster.
3366

3367
    """
3368
    instance = self.cfg.GetInstanceInfo(
3369
      self.cfg.ExpandInstanceName(self.op.instance_name))
3370
    if instance is None:
3371
      raise errors.OpPrereqError("Instance '%s' not known" %
3372
                                 self.op.instance_name)
3373
    self.instance = instance
3374

    
3375
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3376
      raise errors.OpPrereqError("Instance's disk layout is not"
3377
                                 " network mirrored.")
3378

    
3379
    if len(instance.secondary_nodes) != 1:
3380
      raise errors.OpPrereqError("The instance has a strange layout,"
3381
                                 " expected one secondary but found %d" %
3382
                                 len(instance.secondary_nodes))
3383

    
3384
    self.sec_node = instance.secondary_nodes[0]
3385

    
3386
    remote_node = getattr(self.op, "remote_node", None)
3387
    if remote_node is not None:
3388
      remote_node = self.cfg.ExpandNodeName(remote_node)
3389
      if remote_node is None:
3390
        raise errors.OpPrereqError("Node '%s' not known" %
3391
                                   self.op.remote_node)
3392
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3393
    else:
3394
      self.remote_node_info = None
3395
    if remote_node == instance.primary_node:
3396
      raise errors.OpPrereqError("The specified node is the primary node of"
3397
                                 " the instance.")
3398
    elif remote_node == self.sec_node:
3399
      # the user gave the current secondary, switch to
3400
      # 'no-replace-secondary' mode
3401
      remote_node = None
3402
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3403
        self.op.mode != constants.REPLACE_DISK_ALL):
3404
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3405
                                 " disks replacement, not individual ones")
3406
    if instance.disk_template == constants.DT_DRBD8:
3407
      if self.op.mode == constants.REPLACE_DISK_ALL:
3408
        raise errors.OpPrereqError("Template 'drbd8' only allows primary or"
3409
                                   " secondary disk replacement, not"
3410
                                   " both at once")
3411
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3412
        if remote_node is not None:
3413
          raise errors.OpPrereqError("Template 'drbd8' does not allow changing"
3414
                                     " the secondary while doing a primary"
3415
                                     " node disk replacement")
3416
        self.tgt_node = instance.primary_node
3417
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3418
        self.new_node = remote_node # this can be None, in which case
3419
                                    # we don't change the secondary
3420
        self.tgt_node = instance.secondary_nodes[0]
3421
      else:
3422
        raise errors.ProgrammerError("Unhandled disk replace mode")
3423

    
3424
    for name in self.op.disks:
3425
      if instance.FindDisk(name) is None:
3426
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3427
                                   (name, instance.name))
3428
    self.op.remote_node = remote_node
3429

    
3430
  def _ExecRR1(self, feedback_fn):
3431
    """Replace the disks of an instance.
3432

3433
    """
3434
    instance = self.instance
3435
    iv_names = {}
3436
    # start of work
3437
    if self.op.remote_node is None:
3438
      remote_node = self.sec_node
3439
    else:
3440
      remote_node = self.op.remote_node
3441
    cfg = self.cfg
3442
    for dev in instance.disks:
3443
      size = dev.size
3444
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3445
      names = _GenerateUniqueNames(cfg, lv_names)
3446
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3447
                                       remote_node, size, names)
3448
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3449
      logger.Info("adding new mirror component on secondary for %s" %
3450
                  dev.iv_name)
3451
      #HARDCODE
3452
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3453
                                        new_drbd, False,
3454
                                        _GetInstanceInfoText(instance)):
3455
        raise errors.OpExecError("Failed to create new component on"
3456
                                 " secondary node %s\n"
3457
                                 "Full abort, cleanup manually!" %
3458
                                 remote_node)
3459

    
3460
      logger.Info("adding new mirror component on primary")
3461
      #HARDCODE
3462
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3463
                                      instance, new_drbd,
3464
                                      _GetInstanceInfoText(instance)):
3465
        # remove secondary dev
3466
        cfg.SetDiskID(new_drbd, remote_node)
3467
        rpc.call_blockdev_remove(remote_node, new_drbd)
3468
        raise errors.OpExecError("Failed to create volume on primary!\n"
3469
                                 "Full abort, cleanup manually!!")
3470

    
3471
      # the device exists now
3472
      # call the primary node to add the mirror to md
3473
      logger.Info("adding new mirror component to md")
3474
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3475
                                           [new_drbd]):
3476
        logger.Error("Can't add mirror compoment to md!")
3477
        cfg.SetDiskID(new_drbd, remote_node)
3478
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3479
          logger.Error("Can't rollback on secondary")
3480
        cfg.SetDiskID(new_drbd, instance.primary_node)
3481
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3482
          logger.Error("Can't rollback on primary")
3483
        raise errors.OpExecError("Full abort, cleanup manually!!")
3484

    
3485
      dev.children.append(new_drbd)
3486
      cfg.AddInstance(instance)
3487

    
3488
    # this can fail as the old devices are degraded and _WaitForSync
3489
    # does a combined result over all disks, so we don't check its
3490
    # return value
3491
    _WaitForSync(cfg, instance, unlock=True)
3492

    
3493
    # so check manually all the devices
3494
    for name in iv_names:
3495
      dev, child, new_drbd = iv_names[name]
3496
      cfg.SetDiskID(dev, instance.primary_node)
3497
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3498
      if is_degr:
3499
        raise errors.OpExecError("MD device %s is degraded!" % name)
3500
      cfg.SetDiskID(new_drbd, instance.primary_node)
3501
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3502
      if is_degr:
3503
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3504

    
3505
    for name in iv_names:
3506
      dev, child, new_drbd = iv_names[name]
3507
      logger.Info("remove mirror %s component" % name)
3508
      cfg.SetDiskID(dev, instance.primary_node)
3509
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3510
                                              dev, [child]):
3511
        logger.Error("Can't remove child from mirror, aborting"
3512
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3513
        continue
3514

    
3515
      for node in child.logical_id[:2]:
3516
        logger.Info("remove child device on %s" % node)
3517
        cfg.SetDiskID(child, node)
3518
        if not rpc.call_blockdev_remove(node, child):
3519
          logger.Error("Warning: failed to remove device from node %s,"
3520
                       " continuing operation." % node)
3521

    
3522
      dev.children.remove(child)
3523

    
3524
      cfg.AddInstance(instance)
3525

    
3526
  def _ExecD8DiskOnly(self, feedback_fn):
3527
    """Replace a disk on the primary or secondary for dbrd8.
3528

3529
    The algorithm for replace is quite complicated:
3530
      - for each disk to be replaced:
3531
        - create new LVs on the target node with unique names
3532
        - detach old LVs from the drbd device
3533
        - rename old LVs to name_replaced.<time_t>
3534
        - rename new LVs to old LVs
3535
        - attach the new LVs (with the old names now) to the drbd device
3536
      - wait for sync across all devices
3537
      - for each modified disk:
3538
        - remove old LVs (which have the name name_replaces.<time_t>)
3539

3540
    Failures are not very well handled.
3541
    """
3542
    instance = self.instance
3543
    iv_names = {}
3544
    vgname = self.cfg.GetVGName()
3545
    # start of work
3546
    cfg = self.cfg
3547
    tgt_node = self.tgt_node
3548
    for dev in instance.disks:
3549
      if not dev.iv_name in self.op.disks:
3550
        continue
3551
      size = dev.size
3552
      cfg.SetDiskID(dev, tgt_node)
3553
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3554
      names = _GenerateUniqueNames(cfg, lv_names)
3555
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3556
                             logical_id=(vgname, names[0]))
3557
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3558
                             logical_id=(vgname, names[1]))
3559
      new_lvs = [lv_data, lv_meta]
3560
      old_lvs = dev.children
3561
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3562
      logger.Info("adding new local storage on %s for %s" %
3563
                  (tgt_node, dev.iv_name))
3564
      # since we *always* want to create this LV, we use the
3565
      # _Create...OnPrimary (which forces the creation), even if we
3566
      # are talking about the secondary node
3567
      for new_lv in new_lvs:
3568
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3569
                                        _GetInstanceInfoText(instance)):
3570
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3571
                                   " node '%s'" %
3572
                                   (new_lv.logical_id[1], tgt_node))
3573

    
3574
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3575
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3576
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3577
      dev.children = []
3578
      cfg.Update(instance)
3579

    
3580
      # ok, we created the new LVs, so now we know we have the needed
3581
      # storage; as such, we proceed on the target node to rename
3582
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3583
      # using the assumption than logical_id == physical_id (which in
3584
      # turn is the unique_id on that node)
3585
      temp_suffix = int(time.time())
3586
      logger.Info("renaming the old LVs on the target node")
3587
      ren_fn = lambda d, suff: (d.physical_id[0],
3588
                                d.physical_id[1] + "_replaced-%s" % suff)
3589
      rlist = [(disk, ren_fn(disk, temp_suffix)) for disk in old_lvs]
3590
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3591
        logger.Error("Can't rename old LVs on node %s" % tgt_node)
3592
        do_change_old = False
3593
      else:
3594
        do_change_old = True
3595
      # now we rename the new LVs to the old LVs
3596
      logger.Info("renaming the new LVs on the target node")
3597
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3598
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3599
        logger.Error("Can't rename new LVs on node %s" % tgt_node)
3600
      else:
3601
        for old, new in zip(old_lvs, new_lvs):
3602
          new.logical_id = old.logical_id
3603
          cfg.SetDiskID(new, tgt_node)
3604

    
3605
      if do_change_old:
3606
        for disk in old_lvs:
3607
          disk.logical_id = ren_fn(disk, temp_suffix)
3608
          cfg.SetDiskID(disk, tgt_node)
3609

    
3610
      # now that the new lvs have the old name, we can add them to the device
3611
      logger.Info("adding new mirror component on %s" % tgt_node)
3612
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3613
        logger.Error("Can't add local storage to drbd!")
3614
        for new_lv in new_lvs:
3615
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3616
            logger.Error("Can't rollback device %s")
3617
        return
3618

    
3619
      dev.children = new_lvs
3620
      cfg.Update(instance)
3621

    
3622

    
3623
    # this can fail as the old devices are degraded and _WaitForSync
3624
    # does a combined result over all disks, so we don't check its
3625
    # return value
3626
    logger.Info("Done changing drbd configs, waiting for sync")
3627
    _WaitForSync(cfg, instance, unlock=True)
3628

    
3629
    # so check manually all the devices
3630
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3631
      cfg.SetDiskID(dev, instance.primary_node)
3632
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3633
      if is_degr:
3634
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3635

    
3636
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3637
      logger.Info("remove logical volumes for %s" % name)
3638
      for lv in old_lvs:
3639
        cfg.SetDiskID(lv, tgt_node)
3640
        if not rpc.call_blockdev_remove(tgt_node, lv):
3641
          logger.Error("Can't cleanup child device, skipping. You need to"
3642
                       " fix manually!")
3643
          continue
3644

    
3645
  def _ExecD8Secondary(self, feedback_fn):
3646
    """Replace the secondary node for drbd8.
3647

3648
    The algorithm for replace is quite complicated:
3649
      - for all disks of the instance:
3650
        - create new LVs on the new node with same names
3651
        - shutdown the drbd device on the old secondary
3652
        - disconnect the drbd network on the primary
3653
        - create the drbd device on the new secondary
3654
        - network attach the drbd on the primary, using an artifice:
3655
          the drbd code for Attach() will connect to the network if it
3656
          finds a device which is connected to the good local disks but
3657
          not network enabled
3658
      - wait for sync across all devices
3659
      - remove all disks from the old secondary
3660

3661
    Failures are not very well handled.
3662
    """
3663
    instance = self.instance
3664
    iv_names = {}
3665
    vgname = self.cfg.GetVGName()
3666
    # start of work
3667
    cfg = self.cfg
3668
    old_node = self.tgt_node
3669
    new_node = self.new_node
3670
    pri_node = instance.primary_node
3671
    for dev in instance.disks:
3672
      size = dev.size
3673
      logger.Info("adding new local storage on %s for %s" %
3674
                  (new_node, dev.iv_name))
3675
      # since we *always* want to create this LV, we use the
3676
      # _Create...OnPrimary (which forces the creation), even if we
3677
      # are talking about the secondary node
3678
      for new_lv in dev.children:
3679
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3680
                                        _GetInstanceInfoText(instance)):
3681
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3682
                                   " node '%s'" %
3683
                                   (new_lv.logical_id[1], new_node))
3684

    
3685
      # create new devices on new_node
3686
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3687
                              logical_id=(pri_node, new_node,
3688
                                          dev.logical_id[2]),
3689
                              children=dev.children)
3690
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3691
                                        new_drbd, False,
3692
                                      _GetInstanceInfoText(instance)):
3693
        raise errors.OpExecError("Failed to create new DRBD on"
3694
                                 " node '%s'" % new_node)
3695

    
3696
      # we have new devices, shutdown the drbd on the old secondary
3697
      cfg.SetDiskID(dev, old_node)
3698
      if not rpc.call_blockdev_shutdown(old_node, dev):
3699
        raise errors.OpExecError("Failed to shutdown DRBD on old node")
3700

    
3701
      # we have new storage, we 'rename' the network on the primary
3702
      cfg.SetDiskID(dev, pri_node)
3703
      # rename to the ip of the new node
3704
      new_uid = list(dev.physical_id)
3705
      new_uid[2] = self.remote_node_info.secondary_ip
3706
      rlist = [(dev, tuple(new_uid))]
3707
      if not rpc.call_blockdev_rename(pri_node, rlist):
3708
        raise errors.OpExecError("Can't detach re-attach drbd %s on node"
3709
                                 " %s from %s to %s" %
3710
                                 (dev.iv_name, pri_node, old_node, new_node))
3711
      dev.logical_id = (pri_node, new_node, dev.logical_id[2])
3712
      cfg.SetDiskID(dev, pri_node)
3713
      cfg.Update(instance)
3714

    
3715
      iv_names[dev.iv_name] = (dev, dev.children)
3716

    
3717
    # this can fail as the old devices are degraded and _WaitForSync
3718
    # does a combined result over all disks, so we don't check its
3719
    # return value
3720
    logger.Info("Done changing drbd configs, waiting for sync")
3721
    _WaitForSync(cfg, instance, unlock=True)
3722

    
3723
    # so check manually all the devices
3724
    for name, (dev, old_lvs) in iv_names.iteritems():
3725
      cfg.SetDiskID(dev, pri_node)
3726
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3727
      if is_degr:
3728
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3729

    
3730
    for name, (dev, old_lvs) in iv_names.iteritems():
3731
      logger.Info("remove logical volumes for %s" % name)
3732
      for lv in old_lvs:
3733
        cfg.SetDiskID(lv, old_node)
3734
        if not rpc.call_blockdev_remove(old_node, lv):
3735
          logger.Error("Can't cleanup child device, skipping. You need to"
3736
                       " fix manually!")
3737
          continue
3738

    
3739
  def Exec(self, feedback_fn):
3740
    """Execute disk replacement.
3741

3742
    This dispatches the disk replacement to the appropriate handler.
3743

3744
    """
3745
    instance = self.instance
3746
    if instance.disk_template == constants.DT_REMOTE_RAID1:
3747
      fn = self._ExecRR1
3748
    elif instance.disk_template == constants.DT_DRBD8:
3749
      if self.op.remote_node is None:
3750
        fn = self._ExecD8DiskOnly
3751
      else:
3752
        fn = self._ExecD8Secondary
3753
    else:
3754
      raise errors.ProgrammerError("Unhandled disk replacement case")
3755
    return fn(feedback_fn)
3756

    
3757

    
3758
class LUQueryInstanceData(NoHooksLU):
3759
  """Query runtime instance data.
3760

3761
  """
3762
  _OP_REQP = ["instances"]
3763

    
3764
  def CheckPrereq(self):
3765
    """Check prerequisites.
3766

3767
    This only checks the optional instance list against the existing names.
3768

3769
    """
3770
    if not isinstance(self.op.instances, list):
3771
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3772
    if self.op.instances:
3773
      self.wanted_instances = []
3774
      names = self.op.instances
3775
      for name in names:
3776
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3777
        if instance is None:
3778
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3779
      self.wanted_instances.append(instance)
3780
    else:
3781
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3782
                               in self.cfg.GetInstanceList()]
3783
    return
3784

    
3785

    
3786
  def _ComputeDiskStatus(self, instance, snode, dev):
3787
    """Compute block device status.
3788

3789
    """
3790
    self.cfg.SetDiskID(dev, instance.primary_node)
3791
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3792
    if dev.dev_type in constants.LDS_DRBD:
3793
      # we change the snode then (otherwise we use the one passed in)
3794
      if dev.logical_id[0] == instance.primary_node:
3795
        snode = dev.logical_id[1]
3796
      else:
3797
        snode = dev.logical_id[0]
3798

    
3799
    if snode:
3800
      self.cfg.SetDiskID(dev, snode)
3801
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3802
    else:
3803
      dev_sstatus = None
3804

    
3805
    if dev.children:
3806
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3807
                      for child in dev.children]
3808
    else:
3809
      dev_children = []
3810

    
3811
    data = {
3812
      "iv_name": dev.iv_name,
3813
      "dev_type": dev.dev_type,
3814
      "logical_id": dev.logical_id,
3815
      "physical_id": dev.physical_id,
3816
      "pstatus": dev_pstatus,
3817
      "sstatus": dev_sstatus,
3818
      "children": dev_children,
3819
      }
3820

    
3821
    return data
3822

    
3823
  def Exec(self, feedback_fn):
3824
    """Gather and return data"""
3825
    result = {}
3826
    for instance in self.wanted_instances:
3827
      remote_info = rpc.call_instance_info(instance.primary_node,
3828
                                                instance.name)
3829
      if remote_info and "state" in remote_info:
3830
        remote_state = "up"
3831
      else:
3832
        remote_state = "down"
3833
      if instance.status == "down":
3834
        config_state = "down"
3835
      else:
3836
        config_state = "up"
3837

    
3838
      disks = [self._ComputeDiskStatus(instance, None, device)
3839
               for device in instance.disks]
3840

    
3841
      idict = {
3842
        "name": instance.name,
3843
        "config_state": config_state,
3844
        "run_state": remote_state,
3845
        "pnode": instance.primary_node,
3846
        "snodes": instance.secondary_nodes,
3847
        "os": instance.os,
3848
        "memory": instance.memory,
3849
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3850
        "disks": disks,
3851
        "vcpus": instance.vcpus,
3852
        }
3853

    
3854
      result[instance.name] = idict
3855

    
3856
    return result
3857

    
3858

    
3859
class LUSetInstanceParms(LogicalUnit):
3860
  """Modifies an instances's parameters.
3861

3862
  """
3863
  HPATH = "instance-modify"
3864
  HTYPE = constants.HTYPE_INSTANCE
3865
  _OP_REQP = ["instance_name"]
3866

    
3867
  def BuildHooksEnv(self):
3868
    """Build hooks env.
3869

3870
    This runs on the master, primary and secondaries.
3871

3872
    """
3873
    args = dict()
3874
    if self.mem:
3875
      args['memory'] = self.mem
3876
    if self.vcpus:
3877
      args['vcpus'] = self.vcpus
3878
    if self.do_ip or self.do_bridge:
3879
      if self.do_ip:
3880
        ip = self.ip
3881
      else:
3882
        ip = self.instance.nics[0].ip
3883
      if self.bridge:
3884
        bridge = self.bridge
3885
      else:
3886
        bridge = self.instance.nics[0].bridge
3887
      args['nics'] = [(ip, bridge)]
3888
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3889
    nl = [self.sstore.GetMasterNode(),
3890
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3891
    return env, nl, nl
3892

    
3893
  def CheckPrereq(self):
3894
    """Check prerequisites.
3895

3896
    This only checks the instance list against the existing names.
3897

3898
    """
3899
    self.mem = getattr(self.op, "mem", None)
3900
    self.vcpus = getattr(self.op, "vcpus", None)
3901
    self.ip = getattr(self.op, "ip", None)
3902
    self.bridge = getattr(self.op, "bridge", None)
3903
    if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3904
      raise errors.OpPrereqError("No changes submitted")
3905
    if self.mem is not None:
3906
      try:
3907
        self.mem = int(self.mem)
3908
      except ValueError, err:
3909
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3910
    if self.vcpus is not None:
3911
      try:
3912
        self.vcpus = int(self.vcpus)
3913
      except ValueError, err:
3914
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3915
    if self.ip is not None:
3916
      self.do_ip = True
3917
      if self.ip.lower() == "none":
3918
        self.ip = None
3919
      else:
3920
        if not utils.IsValidIP(self.ip):
3921
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3922
    else:
3923
      self.do_ip = False
3924
    self.do_bridge = (self.bridge is not None)
3925

    
3926
    instance = self.cfg.GetInstanceInfo(
3927
      self.cfg.ExpandInstanceName(self.op.instance_name))
3928
    if instance is None:
3929
      raise errors.OpPrereqError("No such instance name '%s'" %
3930
                                 self.op.instance_name)
3931
    self.op.instance_name = instance.name
3932
    self.instance = instance
3933
    return
3934

    
3935
  def Exec(self, feedback_fn):
3936
    """Modifies an instance.
3937

3938
    All parameters take effect only at the next restart of the instance.
3939
    """
3940
    result = []
3941
    instance = self.instance
3942
    if self.mem:
3943
      instance.memory = self.mem
3944
      result.append(("mem", self.mem))
3945
    if self.vcpus:
3946
      instance.vcpus = self.vcpus
3947
      result.append(("vcpus",  self.vcpus))
3948
    if self.do_ip:
3949
      instance.nics[0].ip = self.ip
3950
      result.append(("ip", self.ip))
3951
    if self.bridge:
3952
      instance.nics[0].bridge = self.bridge
3953
      result.append(("bridge", self.bridge))
3954

    
3955
    self.cfg.AddInstance(instance)
3956

    
3957
    return result
3958

    
3959

    
3960
class LUQueryExports(NoHooksLU):
3961
  """Query the exports list
3962

3963
  """
3964
  _OP_REQP = []
3965

    
3966
  def CheckPrereq(self):
3967
    """Check that the nodelist contains only existing nodes.
3968

3969
    """
3970
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3971

    
3972
  def Exec(self, feedback_fn):
3973
    """Compute the list of all the exported system images.
3974

3975
    Returns:
3976
      a dictionary with the structure node->(export-list)
3977
      where export-list is a list of the instances exported on
3978
      that node.
3979

3980
    """
3981
    return rpc.call_export_list(self.nodes)
3982

    
3983

    
3984
class LUExportInstance(LogicalUnit):
3985
  """Export an instance to an image in the cluster.
3986

3987
  """
3988
  HPATH = "instance-export"
3989
  HTYPE = constants.HTYPE_INSTANCE
3990
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
3991

    
3992
  def BuildHooksEnv(self):
3993
    """Build hooks env.
3994

3995
    This will run on the master, primary node and target node.
3996

3997
    """
3998
    env = {
3999
      "EXPORT_NODE": self.op.target_node,
4000
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4001
      }
4002
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4003
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4004
          self.op.target_node]
4005
    return env, nl, nl
4006

    
4007
  def CheckPrereq(self):
4008
    """Check prerequisites.
4009

4010
    This checks that the instance name is a valid one.
4011

4012
    """
4013
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4014
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4015
    if self.instance is None:
4016
      raise errors.OpPrereqError("Instance '%s' not found" %
4017
                                 self.op.instance_name)
4018

    
4019
    # node verification
4020
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4021
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4022

    
4023
    if self.dst_node is None:
4024
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4025
                                 self.op.target_node)
4026
    self.op.target_node = self.dst_node.name
4027

    
4028
  def Exec(self, feedback_fn):
4029
    """Export an instance to an image in the cluster.
4030

4031
    """
4032
    instance = self.instance
4033
    dst_node = self.dst_node
4034
    src_node = instance.primary_node
4035
    # shutdown the instance, unless requested not to do so
4036
    if self.op.shutdown:
4037
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
4038
      self.processor.ChainOpCode(op)
4039

    
4040
    vgname = self.cfg.GetVGName()
4041

    
4042
    snap_disks = []
4043

    
4044
    try:
4045
      for disk in instance.disks:
4046
        if disk.iv_name == "sda":
4047
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4048
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4049

    
4050
          if not new_dev_name:
4051
            logger.Error("could not snapshot block device %s on node %s" %
4052
                         (disk.logical_id[1], src_node))
4053
          else:
4054
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4055
                                      logical_id=(vgname, new_dev_name),
4056
                                      physical_id=(vgname, new_dev_name),
4057
                                      iv_name=disk.iv_name)
4058
            snap_disks.append(new_dev)
4059

    
4060
    finally:
4061
      if self.op.shutdown:
4062
        op = opcodes.OpStartupInstance(instance_name=instance.name,
4063
                                       force=False)
4064
        self.processor.ChainOpCode(op)
4065

    
4066
    # TODO: check for size
4067

    
4068
    for dev in snap_disks:
4069
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4070
                                           instance):
4071
        logger.Error("could not export block device %s from node"
4072
                     " %s to node %s" %
4073
                     (dev.logical_id[1], src_node, dst_node.name))
4074
      if not rpc.call_blockdev_remove(src_node, dev):
4075
        logger.Error("could not remove snapshot block device %s from"
4076
                     " node %s" % (dev.logical_id[1], src_node))
4077

    
4078
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4079
      logger.Error("could not finalize export for instance %s on node %s" %
4080
                   (instance.name, dst_node.name))
4081

    
4082
    nodelist = self.cfg.GetNodeList()
4083
    nodelist.remove(dst_node.name)
4084

    
4085
    # on one-node clusters nodelist will be empty after the removal
4086
    # if we proceed the backup would be removed because OpQueryExports
4087
    # substitutes an empty list with the full cluster node list.
4088
    if nodelist:
4089
      op = opcodes.OpQueryExports(nodes=nodelist)
4090
      exportlist = self.processor.ChainOpCode(op)
4091
      for node in exportlist:
4092
        if instance.name in exportlist[node]:
4093
          if not rpc.call_export_remove(node, instance.name):
4094
            logger.Error("could not remove older export for instance %s"
4095
                         " on node %s" % (instance.name, node))
4096

    
4097

    
4098
class TagsLU(NoHooksLU):
4099
  """Generic tags LU.
4100

4101
  This is an abstract class which is the parent of all the other tags LUs.
4102

4103
  """
4104
  def CheckPrereq(self):
4105
    """Check prerequisites.
4106

4107
    """
4108
    if self.op.kind == constants.TAG_CLUSTER:
4109
      self.target = self.cfg.GetClusterInfo()
4110
    elif self.op.kind == constants.TAG_NODE:
4111
      name = self.cfg.ExpandNodeName(self.op.name)
4112
      if name is None:
4113
        raise errors.OpPrereqError("Invalid node name (%s)" %
4114
                                   (self.op.name,))
4115
      self.op.name = name
4116
      self.target = self.cfg.GetNodeInfo(name)
4117
    elif self.op.kind == constants.TAG_INSTANCE:
4118
      name = self.cfg.ExpandInstanceName(self.op.name)
4119
      if name is None:
4120
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4121
                                   (self.op.name,))
4122
      self.op.name = name
4123
      self.target = self.cfg.GetInstanceInfo(name)
4124
    else:
4125
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4126
                                 str(self.op.kind))
4127

    
4128

    
4129
class LUGetTags(TagsLU):
4130
  """Returns the tags of a given object.
4131

4132
  """
4133
  _OP_REQP = ["kind", "name"]
4134

    
4135
  def Exec(self, feedback_fn):
4136
    """Returns the tag list.
4137

4138
    """
4139
    return self.target.GetTags()
4140

    
4141

    
4142
class LUSearchTags(NoHooksLU):
4143
  """Searches the tags for a given pattern.
4144

4145
  """
4146
  _OP_REQP = ["pattern"]
4147

    
4148
  def CheckPrereq(self):
4149
    """Check prerequisites.
4150

4151
    This checks the pattern passed for validity by compiling it.
4152

4153
    """
4154
    try:
4155
      self.re = re.compile(self.op.pattern)
4156
    except re.error, err:
4157
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4158
                                 (self.op.pattern, err))
4159

    
4160
  def Exec(self, feedback_fn):
4161
    """Returns the tag list.
4162

4163
    """
4164
    cfg = self.cfg
4165
    tgts = [("/cluster", cfg.GetClusterInfo())]
4166
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4167
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4168
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4169
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4170
    results = []
4171
    for path, target in tgts:
4172
      for tag in target.GetTags():
4173
        if self.re.search(tag):
4174
          results.append((path, tag))
4175
    return results
4176

    
4177

    
4178
class LUAddTags(TagsLU):
4179
  """Sets a tag on a given object.
4180

4181
  """
4182
  _OP_REQP = ["kind", "name", "tags"]
4183

    
4184
  def CheckPrereq(self):
4185
    """Check prerequisites.
4186

4187
    This checks the type and length of the tag name and value.
4188

4189
    """
4190
    TagsLU.CheckPrereq(self)
4191
    for tag in self.op.tags:
4192
      objects.TaggableObject.ValidateTag(tag)
4193

    
4194
  def Exec(self, feedback_fn):
4195
    """Sets the tag.
4196

4197
    """
4198
    try:
4199
      for tag in self.op.tags:
4200
        self.target.AddTag(tag)
4201
    except errors.TagError, err:
4202
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4203
    try:
4204
      self.cfg.Update(self.target)
4205
    except errors.ConfigurationError:
4206
      raise errors.OpRetryError("There has been a modification to the"
4207
                                " config file and the operation has been"
4208
                                " aborted. Please retry.")
4209

    
4210

    
4211
class LUDelTags(TagsLU):
4212
  """Delete a list of tags from a given object.
4213

4214
  """
4215
  _OP_REQP = ["kind", "name", "tags"]
4216

    
4217
  def CheckPrereq(self):
4218
    """Check prerequisites.
4219

4220
    This checks that we have the given tag.
4221

4222
    """
4223
    TagsLU.CheckPrereq(self)
4224
    for tag in self.op.tags:
4225
      objects.TaggableObject.ValidateTag(tag)
4226
    del_tags = frozenset(self.op.tags)
4227
    cur_tags = self.target.GetTags()
4228
    if not del_tags <= cur_tags:
4229
      diff_tags = del_tags - cur_tags
4230
      diff_names = ["'%s'" % tag for tag in diff_tags]
4231
      diff_names.sort()
4232
      raise errors.OpPrereqError("Tag(s) %s not found" %
4233
                                 (",".join(diff_names)))
4234

    
4235
  def Exec(self, feedback_fn):
4236
    """Remove the tag from the object.
4237

4238
    """
4239
    for tag in self.op.tags:
4240
      self.target.RemoveTag(tag)
4241
    try:
4242
      self.cfg.Update(self.target)
4243
    except errors.ConfigurationError:
4244
      raise errors.OpRetryError("There has been a modification to the"
4245
                                " config file and the operation has been"
4246
                                " aborted. Please retry.")