Verify: save instance config
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import simplejson
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46
47
48 # Check whether the simplejson module supports indentation
49 _JSON_INDENT = 2
50 try:
51   simplejson.dumps(1, indent=_JSON_INDENT)
52 except TypeError:
53   _JSON_INDENT = None
54
55
56 class LogicalUnit(object):
57   """Logical Unit base class.
58
59   Subclasses must follow these rules:
60     - implement CheckPrereq which also fills in the opcode instance
61       with all the fields (even if as None)
62     - implement Exec
63     - implement BuildHooksEnv
64     - redefine HPATH and HTYPE
65     - optionally redefine their run requirements (REQ_CLUSTER,
66       REQ_MASTER); note that all commands require root permissions
67
68   """
69   HPATH = None
70   HTYPE = None
71   _OP_REQP = []
72   REQ_CLUSTER = True
73   REQ_MASTER = True
74
75   def __init__(self, processor, op, cfg, sstore):
76     """Constructor for LogicalUnit.
77
78     This needs to be overriden in derived classes in order to check op
79     validity.
80
81     """
82     self.proc = processor
83     self.op = op
84     self.cfg = cfg
85     self.sstore = sstore
86     for attr_name in self._OP_REQP:
87       attr_val = getattr(op, attr_name, None)
88       if attr_val is None:
89         raise errors.OpPrereqError("Required parameter '%s' missing" %
90                                    attr_name)
91     if self.REQ_CLUSTER:
92       if not cfg.IsCluster():
93         raise errors.OpPrereqError("Cluster not initialized yet,"
94                                    " use 'gnt-cluster init' first.")
95       if self.REQ_MASTER:
96         master = sstore.GetMasterNode()
97         if master != utils.HostInfo().name:
98           raise errors.OpPrereqError("Commands must be run on the master"
99                                      " node %s" % master)
100
101   def CheckPrereq(self):
102     """Check prerequisites for this LU.
103
104     This method should check that the prerequisites for the execution
105     of this LU are fulfilled. It can do internode communication, but
106     it should be idempotent - no cluster or system changes are
107     allowed.
108
109     The method should raise errors.OpPrereqError in case something is
110     not fulfilled. Its return value is ignored.
111
112     This method should also update all the parameters of the opcode to
113     their canonical form; e.g. a short node name must be fully
114     expanded after this method has successfully completed (so that
115     hooks, logging, etc. work correctly).
116
117     """
118     raise NotImplementedError
119
120   def Exec(self, feedback_fn):
121     """Execute the LU.
122
123     This method should implement the actual work. It should raise
124     errors.OpExecError for failures that are somewhat dealt with in
125     code, or expected.
126
127     """
128     raise NotImplementedError
129
130   def BuildHooksEnv(self):
131     """Build hooks environment for this LU.
132
133     This method should return a three-node tuple consisting of: a dict
134     containing the environment that will be used for running the
135     specific hook for this LU, a list of node names on which the hook
136     should run before the execution, and a list of node names on which
137     the hook should run after the execution.
138
139     The keys of the dict must not have 'GANETI_' prefixed as this will
140     be handled in the hooks runner. Also note additional keys will be
141     added by the hooks runner. If the LU doesn't define any
142     environment, an empty dict (and not None) should be returned.
143
144     As for the node lists, the master should not be included in the
145     them, as it will be added by the hooks runner in case this LU
146     requires a cluster to run on (otherwise we don't have a node
147     list). No nodes should be returned as an empty list (and not
148     None).
149
150     Note that if the HPATH for a LU class is None, this function will
151     not be called.
152
153     """
154     raise NotImplementedError
155
156
157 class NoHooksLU(LogicalUnit):
158   """Simple LU which runs no hooks.
159
160   This LU is intended as a parent for other LogicalUnits which will
161   run no hooks, in order to reduce duplicate code.
162
163   """
164   HPATH = None
165   HTYPE = None
166
167   def BuildHooksEnv(self):
168     """Build hooks env.
169
170     This is a no-op, since we don't run hooks.
171
172     """
173     return {}, [], []
174
175
176 def _AddHostToEtcHosts(hostname):
177   """Wrapper around utils.SetEtcHostsEntry.
178
179   """
180   hi = utils.HostInfo(name=hostname)
181   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
182
183
184 def _RemoveHostFromEtcHosts(hostname):
185   """Wrapper around utils.RemoveEtcHostsEntry.
186
187   """
188   hi = utils.HostInfo(name=hostname)
189   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
190   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
191
192
193 def _GetWantedNodes(lu, nodes):
194   """Returns list of checked and expanded node names.
195
196   Args:
197     nodes: List of nodes (strings) or None for all
198
199   """
200   if not isinstance(nodes, list):
201     raise errors.OpPrereqError("Invalid argument type 'nodes'")
202
203   if nodes:
204     wanted = []
205
206     for name in nodes:
207       node = lu.cfg.ExpandNodeName(name)
208       if node is None:
209         raise errors.OpPrereqError("No such node name '%s'" % name)
210       wanted.append(node)
211
212   else:
213     wanted = lu.cfg.GetNodeList()
214   return utils.NiceSort(wanted)
215
216
217 def _GetWantedInstances(lu, instances):
218   """Returns list of checked and expanded instance names.
219
220   Args:
221     instances: List of instances (strings) or None for all
222
223   """
224   if not isinstance(instances, list):
225     raise errors.OpPrereqError("Invalid argument type 'instances'")
226
227   if instances:
228     wanted = []
229
230     for name in instances:
231       instance = lu.cfg.ExpandInstanceName(name)
232       if instance is None:
233         raise errors.OpPrereqError("No such instance name '%s'" % name)
234       wanted.append(instance)
235
236   else:
237     wanted = lu.cfg.GetInstanceList()
238   return utils.NiceSort(wanted)
239
240
241 def _CheckOutputFields(static, dynamic, selected):
242   """Checks whether all selected fields are valid.
243
244   Args:
245     static: Static fields
246     dynamic: Dynamic fields
247
248   """
249   static_fields = frozenset(static)
250   dynamic_fields = frozenset(dynamic)
251
252   all_fields = static_fields | dynamic_fields
253
254   if not all_fields.issuperset(selected):
255     raise errors.OpPrereqError("Unknown output fields selected: %s"
256                                % ",".join(frozenset(selected).
257                                           difference(all_fields)))
258
259
260 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
261                           memory, vcpus, nics):
262   """Builds instance related env variables for hooks from single variables.
263
264   Args:
265     secondary_nodes: List of secondary nodes as strings
266   """
267   env = {
268     "OP_TARGET": name,
269     "INSTANCE_NAME": name,
270     "INSTANCE_PRIMARY": primary_node,
271     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
272     "INSTANCE_OS_TYPE": os_type,
273     "INSTANCE_STATUS": status,
274     "INSTANCE_MEMORY": memory,
275     "INSTANCE_VCPUS": vcpus,
276   }
277
278   if nics:
279     nic_count = len(nics)
280     for idx, (ip, bridge, mac) in enumerate(nics):
281       if ip is None:
282         ip = ""
283       env["INSTANCE_NIC%d_IP" % idx] = ip
284       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
285       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
286   else:
287     nic_count = 0
288
289   env["INSTANCE_NIC_COUNT"] = nic_count
290
291   return env
292
293
294 def _BuildInstanceHookEnvByObject(instance, override=None):
295   """Builds instance related env variables for hooks from an object.
296
297   Args:
298     instance: objects.Instance object of instance
299     override: dict of values to override
300   """
301   args = {
302     'name': instance.name,
303     'primary_node': instance.primary_node,
304     'secondary_nodes': instance.secondary_nodes,
305     'os_type': instance.os,
306     'status': instance.os,
307     'memory': instance.memory,
308     'vcpus': instance.vcpus,
309     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
310   }
311   if override:
312     args.update(override)
313   return _BuildInstanceHookEnv(**args)
314
315
316 def _UpdateKnownHosts(fullnode, ip, pubkey):
317   """Ensure a node has a correct known_hosts entry.
318
319   Args:
320     fullnode - Fully qualified domain name of host. (str)
321     ip       - IPv4 address of host (str)
322     pubkey   - the public key of the cluster
323
324   """
325   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
326     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
327   else:
328     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
329
330   inthere = False
331
332   save_lines = []
333   add_lines = []
334   removed = False
335
336   for rawline in f:
337     logger.Debug('read %s' % (repr(rawline),))
338
339     parts = rawline.rstrip('\r\n').split()
340
341     # Ignore unwanted lines
342     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
343       fields = parts[0].split(',')
344       key = parts[2]
345
346       haveall = True
347       havesome = False
348       for spec in [ ip, fullnode ]:
349         if spec not in fields:
350           haveall = False
351         if spec in fields:
352           havesome = True
353
354       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
355       if haveall and key == pubkey:
356         inthere = True
357         save_lines.append(rawline)
358         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
359         continue
360
361       if havesome and (not haveall or key != pubkey):
362         removed = True
363         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
364         continue
365
366     save_lines.append(rawline)
367
368   if not inthere:
369     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
370     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
371
372   if removed:
373     save_lines = save_lines + add_lines
374
375     # Write a new file and replace old.
376     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
377                                    constants.DATA_DIR)
378     newfile = os.fdopen(fd, 'w')
379     try:
380       newfile.write(''.join(save_lines))
381     finally:
382       newfile.close()
383     logger.Debug("Wrote new known_hosts.")
384     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
385
386   elif add_lines:
387     # Simply appending a new line will do the trick.
388     f.seek(0, 2)
389     for add in add_lines:
390       f.write(add)
391
392   f.close()
393
394
395 def _HasValidVG(vglist, vgname):
396   """Checks if the volume group list is valid.
397
398   A non-None return value means there's an error, and the return value
399   is the error message.
400
401   """
402   vgsize = vglist.get(vgname, None)
403   if vgsize is None:
404     return "volume group '%s' missing" % vgname
405   elif vgsize < 20480:
406     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
407             (vgname, vgsize))
408   return None
409
410
411 def _InitSSHSetup(node):
412   """Setup the SSH configuration for the cluster.
413
414
415   This generates a dsa keypair for root, adds the pub key to the
416   permitted hosts and adds the hostkey to its own known hosts.
417
418   Args:
419     node: the name of this host as a fqdn
420
421   """
422   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
423
424   for name in priv_key, pub_key:
425     if os.path.exists(name):
426       utils.CreateBackup(name)
427     utils.RemoveFile(name)
428
429   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
430                          "-f", priv_key,
431                          "-q", "-N", ""])
432   if result.failed:
433     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
434                              result.output)
435
436   f = open(pub_key, 'r')
437   try:
438     utils.AddAuthorizedKey(auth_keys, f.read(8192))
439   finally:
440     f.close()
441
442
443 def _InitGanetiServerSetup(ss):
444   """Setup the necessary configuration for the initial node daemon.
445
446   This creates the nodepass file containing the shared password for
447   the cluster and also generates the SSL certificate.
448
449   """
450   # Create pseudo random password
451   randpass = sha.new(os.urandom(64)).hexdigest()
452   # and write it into sstore
453   ss.SetKey(ss.SS_NODED_PASS, randpass)
454
455   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
456                          "-days", str(365*5), "-nodes", "-x509",
457                          "-keyout", constants.SSL_CERT_FILE,
458                          "-out", constants.SSL_CERT_FILE, "-batch"])
459   if result.failed:
460     raise errors.OpExecError("could not generate server ssl cert, command"
461                              " %s had exitcode %s and error message %s" %
462                              (result.cmd, result.exit_code, result.output))
463
464   os.chmod(constants.SSL_CERT_FILE, 0400)
465
466   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
467
468   if result.failed:
469     raise errors.OpExecError("Could not start the node daemon, command %s"
470                              " had exitcode %s and error %s" %
471                              (result.cmd, result.exit_code, result.output))
472
473
474 def _CheckInstanceBridgesExist(instance):
475   """Check that the brigdes needed by an instance exist.
476
477   """
478   # check bridges existance
479   brlist = [nic.bridge for nic in instance.nics]
480   if not rpc.call_bridges_exist(instance.primary_node, brlist):
481     raise errors.OpPrereqError("one or more target bridges %s does not"
482                                " exist on destination node '%s'" %
483                                (brlist, instance.primary_node))
484
485
486 class LUInitCluster(LogicalUnit):
487   """Initialise the cluster.
488
489   """
490   HPATH = "cluster-init"
491   HTYPE = constants.HTYPE_CLUSTER
492   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
493               "def_bridge", "master_netdev"]
494   REQ_CLUSTER = False
495
496   def BuildHooksEnv(self):
497     """Build hooks env.
498
499     Notes: Since we don't require a cluster, we must manually add
500     ourselves in the post-run node list.
501
502     """
503     env = {"OP_TARGET": self.op.cluster_name}
504     return env, [], [self.hostname.name]
505
506   def CheckPrereq(self):
507     """Verify that the passed name is a valid one.
508
509     """
510     if config.ConfigWriter.IsCluster():
511       raise errors.OpPrereqError("Cluster is already initialised")
512
513     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
514       if not os.path.exists(constants.VNC_PASSWORD_FILE):
515         raise errors.OpPrereqError("Please prepare the cluster VNC"
516                                    "password file %s" %
517                                    constants.VNC_PASSWORD_FILE)
518
519     self.hostname = hostname = utils.HostInfo()
520
521     if hostname.ip.startswith("127."):
522       raise errors.OpPrereqError("This host's IP resolves to the private"
523                                  " range (%s). Please fix DNS or %s." %
524                                  (hostname.ip, constants.ETC_HOSTS))
525
526     if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
527                          source=constants.LOCALHOST_IP_ADDRESS):
528       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
529                                  " to %s,\nbut this ip address does not"
530                                  " belong to this host."
531                                  " Aborting." % hostname.ip)
532
533     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
534
535     if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
536                      timeout=5):
537       raise errors.OpPrereqError("Cluster IP already active. Aborting.")
538
539     secondary_ip = getattr(self.op, "secondary_ip", None)
540     if secondary_ip and not utils.IsValidIP(secondary_ip):
541       raise errors.OpPrereqError("Invalid secondary ip given")
542     if (secondary_ip and
543         secondary_ip != hostname.ip and
544         (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
545                            source=constants.LOCALHOST_IP_ADDRESS))):
546       raise errors.OpPrereqError("You gave %s as secondary IP,"
547                                  " but it does not belong to this host." %
548                                  secondary_ip)
549     self.secondary_ip = secondary_ip
550
551     # checks presence of the volume group given
552     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
553
554     if vgstatus:
555       raise errors.OpPrereqError("Error: %s" % vgstatus)
556
557     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
558                     self.op.mac_prefix):
559       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
560                                  self.op.mac_prefix)
561
562     if self.op.hypervisor_type not in constants.HYPER_TYPES:
563       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
564                                  self.op.hypervisor_type)
565
566     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
567     if result.failed:
568       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
569                                  (self.op.master_netdev,
570                                   result.output.strip()))
571
572     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
573             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
574       raise errors.OpPrereqError("Init.d script '%s' missing or not"
575                                  " executable." % constants.NODE_INITD_SCRIPT)
576
577   def Exec(self, feedback_fn):
578     """Initialize the cluster.
579
580     """
581     clustername = self.clustername
582     hostname = self.hostname
583
584     # set up the simple store
585     self.sstore = ss = ssconf.SimpleStore()
586     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
587     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
588     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
589     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
590     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
591
592     # set up the inter-node password and certificate
593     _InitGanetiServerSetup(ss)
594
595     # start the master ip
596     rpc.call_node_start_master(hostname.name)
597
598     # set up ssh config and /etc/hosts
599     f = open(constants.SSH_HOST_RSA_PUB, 'r')
600     try:
601       sshline = f.read()
602     finally:
603       f.close()
604     sshkey = sshline.split(" ")[1]
605
606     _AddHostToEtcHosts(hostname.name)
607
608     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
609
610     _InitSSHSetup(hostname.name)
611
612     # init of cluster config file
613     self.cfg = cfgw = config.ConfigWriter()
614     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
615                     sshkey, self.op.mac_prefix,
616                     self.op.vg_name, self.op.def_bridge)
617
618
619 class LUDestroyCluster(NoHooksLU):
620   """Logical unit for destroying the cluster.
621
622   """
623   _OP_REQP = []
624
625   def CheckPrereq(self):
626     """Check prerequisites.
627
628     This checks whether the cluster is empty.
629
630     Any errors are signalled by raising errors.OpPrereqError.
631
632     """
633     master = self.sstore.GetMasterNode()
634
635     nodelist = self.cfg.GetNodeList()
636     if len(nodelist) != 1 or nodelist[0] != master:
637       raise errors.OpPrereqError("There are still %d node(s) in"
638                                  " this cluster." % (len(nodelist) - 1))
639     instancelist = self.cfg.GetInstanceList()
640     if instancelist:
641       raise errors.OpPrereqError("There are still %d instance(s) in"
642                                  " this cluster." % len(instancelist))
643
644   def Exec(self, feedback_fn):
645     """Destroys the cluster.
646
647     """
648     master = self.sstore.GetMasterNode()
649     if not rpc.call_node_stop_master(master):
650       raise errors.OpExecError("Could not disable the master role")
651     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
652     utils.CreateBackup(priv_key)
653     utils.CreateBackup(pub_key)
654     rpc.call_node_leave_cluster(master)
655
656
657 class LUVerifyCluster(NoHooksLU):
658   """Verifies the cluster status.
659
660   """
661   _OP_REQP = []
662
663   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664                   remote_version, feedback_fn):
665     """Run multiple tests against a node.
666
667     Test list:
668       - compares ganeti version
669       - checks vg existance and size > 20G
670       - checks config file checksum
671       - checks ssh to other nodes
672
673     Args:
674       node: name of the node to check
675       file_list: required list of files
676       local_cksum: dictionary of local files and their checksums
677
678     """
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     if not remote_version:
682       feedback_fn("  - ERROR: connection to %s failed" % (node))
683       return True
684
685     if local_version != remote_version:
686       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
687                       (local_version, node, remote_version))
688       return True
689
690     # checks vg existance and size > 20G
691
692     bad = False
693     if not vglist:
694       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
695                       (node,))
696       bad = True
697     else:
698       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
699       if vgstatus:
700         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
701         bad = True
702
703     # checks config file checksum
704     # checks ssh to any
705
706     if 'filelist' not in node_result:
707       bad = True
708       feedback_fn("  - ERROR: node hasn't returned file checksum data")
709     else:
710       remote_cksum = node_result['filelist']
711       for file_name in file_list:
712         if file_name not in remote_cksum:
713           bad = True
714           feedback_fn("  - ERROR: file '%s' missing" % file_name)
715         elif remote_cksum[file_name] != local_cksum[file_name]:
716           bad = True
717           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
718
719     if 'nodelist' not in node_result:
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
722     else:
723       if node_result['nodelist']:
724         bad = True
725         for node in node_result['nodelist']:
726           feedback_fn("  - ERROR: communication with node '%s': %s" %
727                           (node, node_result['nodelist'][node]))
728     hyp_result = node_result.get('hypervisor', None)
729     if hyp_result is not None:
730       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
731     return bad
732
733   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
734                       node_instance, feedback_fn):
735     """Verify an instance.
736
737     This function checks to see if the required block devices are
738     available on the instance's node.
739
740     """
741     bad = False
742
743     node_current = instanceconfig.primary_node
744
745     node_vol_should = {}
746     instanceconfig.MapLVsByNode(node_vol_should)
747
748     for node in node_vol_should:
749       for volume in node_vol_should[node]:
750         if node not in node_vol_is or volume not in node_vol_is[node]:
751           feedback_fn("  - ERROR: volume %s missing on node %s" %
752                           (volume, node))
753           bad = True
754
755     if not instanceconfig.status == 'down':
756       if (node_current not in node_instance or
757           not instance in node_instance[node_current]):
758         feedback_fn("  - ERROR: instance %s not running on node %s" %
759                         (instance, node_current))
760         bad = True
761
762     for node in node_instance:
763       if (not node == node_current):
764         if instance in node_instance[node]:
765           feedback_fn("  - ERROR: instance %s should not run on node %s" %
766                           (instance, node))
767           bad = True
768
769     return bad
770
771   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
772     """Verify if there are any unknown volumes in the cluster.
773
774     The .os, .swap and backup volumes are ignored. All other volumes are
775     reported as unknown.
776
777     """
778     bad = False
779
780     for node in node_vol_is:
781       for volume in node_vol_is[node]:
782         if node not in node_vol_should or volume not in node_vol_should[node]:
783           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
784                       (volume, node))
785           bad = True
786     return bad
787
788   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
789     """Verify the list of running instances.
790
791     This checks what instances are running but unknown to the cluster.
792
793     """
794     bad = False
795     for node in node_instance:
796       for runninginstance in node_instance[node]:
797         if runninginstance not in instancelist:
798           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
799                           (runninginstance, node))
800           bad = True
801     return bad
802
803   def CheckPrereq(self):
804     """Check prerequisites.
805
806     This has no prerequisites.
807
808     """
809     pass
810
811   def Exec(self, feedback_fn):
812     """Verify integrity of cluster, performing various test on nodes.
813
814     """
815     bad = False
816     feedback_fn("* Verifying global settings")
817     for msg in self.cfg.VerifyConfig():
818       feedback_fn("  - ERROR: %s" % msg)
819
820     vg_name = self.cfg.GetVGName()
821     nodelist = utils.NiceSort(self.cfg.GetNodeList())
822     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
823     i_non_redundant = [] # Non redundant instances
824     node_volume = {}
825     node_instance = {}
826     node_info = {}
827     instance_cfg = {}
828
829     # FIXME: verify OS list
830     # do local checksums
831     file_names = list(self.sstore.GetFileList())
832     file_names.append(constants.SSL_CERT_FILE)
833     file_names.append(constants.CLUSTER_CONF_FILE)
834     local_checksums = utils.FingerprintFiles(file_names)
835
836     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
837     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
838     all_instanceinfo = rpc.call_instance_list(nodelist)
839     all_vglist = rpc.call_vg_list(nodelist)
840     node_verify_param = {
841       'filelist': file_names,
842       'nodelist': nodelist,
843       'hypervisor': None,
844       }
845     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
846     all_rversion = rpc.call_version(nodelist)
847     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
848
849     for node in nodelist:
850       feedback_fn("* Verifying node %s" % node)
851       result = self._VerifyNode(node, file_names, local_checksums,
852                                 all_vglist[node], all_nvinfo[node],
853                                 all_rversion[node], feedback_fn)
854       bad = bad or result
855
856       # node_volume
857       volumeinfo = all_volumeinfo[node]
858
859       if isinstance(volumeinfo, basestring):
860         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
861                     (node, volumeinfo[-400:].encode('string_escape')))
862         bad = True
863         node_volume[node] = {}
864       elif not isinstance(volumeinfo, dict):
865         feedback_fn("  - ERROR: connection to %s failed" % (node,))
866         bad = True
867         continue
868       else:
869         node_volume[node] = volumeinfo
870
871       # node_instance
872       nodeinstance = all_instanceinfo[node]
873       if type(nodeinstance) != list:
874         feedback_fn("  - ERROR: connection to %s failed" % (node,))
875         bad = True
876         continue
877
878       node_instance[node] = nodeinstance
879
880       # node_info
881       nodeinfo = all_ninfo[node]
882       if not isinstance(nodeinfo, dict):
883         feedback_fn("  - ERROR: connection to %s failed" % (node,))
884         bad = True
885         continue
886
887       try:
888         node_info[node] = {
889           "mfree": int(nodeinfo['memory_free']),
890           "dfree": int(nodeinfo['vg_free']),
891           "pinst": [],
892           "sinst": [],
893           # dictionary holding all instances this node is secondary for,
894           # grouped by their primary node. Each key is a cluster node, and each
895           # value is a list of instances which have the key as primary and the
896           # current node as secondary.  this is handy to calculate N+1 memory
897           # availability if you can only failover from a primary to its
898           # secondary.
899           "sinst-by-pnode": {},
900         }
901       except ValueError:
902         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
903         bad = True
904         continue
905
906     node_vol_should = {}
907
908     for instance in instancelist:
909       feedback_fn("* Verifying instance %s" % instance)
910       inst_config = self.cfg.GetInstanceInfo(instance)
911       result =  self._VerifyInstance(instance, inst_config, node_volume,
912                                      node_instance, feedback_fn)
913       bad = bad or result
914
915       inst_config.MapLVsByNode(node_vol_should)
916
917       instance_cfg[instance] = inst_config
918
919       pnode = inst_config.primary_node
920       if pnode in node_info:
921         node_info[pnode]['pinst'].append(instance)
922       else:
923         feedback_fn("  - ERROR: instance %s, connection to primary node"
924                     " %s failed" % (instance, pnode))
925         bad = True
926
927       # If the instance is non-redundant we cannot survive losing its primary
928       # node, so we are not N+1 compliant. On the other hand we have no disk
929       # templates with more than one secondary so that situation is not well
930       # supported either.
931       # FIXME: does not support file-backed instances
932       if len(inst_config.secondary_nodes) == 0:
933         i_non_redundant.append(instance)
934       elif len(inst_config.secondary_nodes) > 1:
935         feedback_fn("  - WARNING: multiple secondaries for instance %s"
936                     % instance)
937
938       for snode in inst_config.secondary_nodes:
939         if snode in node_info:
940           node_info[snode]['sinst'].append(instance)
941           if pnode not in node_info[snode]['sinst-by-pnode']:
942             node_info[snode]['sinst-by-pnode'][pnode] = []
943           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
944         else:
945           feedback_fn("  - ERROR: instance %s, connection to secondary node"
946                       " %s failed" % (instance, snode))
947
948     feedback_fn("* Verifying orphan volumes")
949     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
950                                        feedback_fn)
951     bad = bad or result
952
953     feedback_fn("* Verifying remaining instances")
954     result = self._VerifyOrphanInstances(instancelist, node_instance,
955                                          feedback_fn)
956     bad = bad or result
957
958     return int(bad)
959
960
961 class LUVerifyDisks(NoHooksLU):
962   """Verifies the cluster disks status.
963
964   """
965   _OP_REQP = []
966
967   def CheckPrereq(self):
968     """Check prerequisites.
969
970     This has no prerequisites.
971
972     """
973     pass
974
975   def Exec(self, feedback_fn):
976     """Verify integrity of cluster disks.
977
978     """
979     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
980
981     vg_name = self.cfg.GetVGName()
982     nodes = utils.NiceSort(self.cfg.GetNodeList())
983     instances = [self.cfg.GetInstanceInfo(name)
984                  for name in self.cfg.GetInstanceList()]
985
986     nv_dict = {}
987     for inst in instances:
988       inst_lvs = {}
989       if (inst.status != "up" or
990           inst.disk_template not in constants.DTS_NET_MIRROR):
991         continue
992       inst.MapLVsByNode(inst_lvs)
993       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
994       for node, vol_list in inst_lvs.iteritems():
995         for vol in vol_list:
996           nv_dict[(node, vol)] = inst
997
998     if not nv_dict:
999       return result
1000
1001     node_lvs = rpc.call_volume_list(nodes, vg_name)
1002
1003     to_act = set()
1004     for node in nodes:
1005       # node_volume
1006       lvs = node_lvs[node]
1007
1008       if isinstance(lvs, basestring):
1009         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1010         res_nlvm[node] = lvs
1011       elif not isinstance(lvs, dict):
1012         logger.Info("connection to node %s failed or invalid data returned" %
1013                     (node,))
1014         res_nodes.append(node)
1015         continue
1016
1017       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1018         inst = nv_dict.pop((node, lv_name), None)
1019         if (not lv_online and inst is not None
1020             and inst.name not in res_instances):
1021           res_instances.append(inst.name)
1022
1023     # any leftover items in nv_dict are missing LVs, let's arrange the
1024     # data better
1025     for key, inst in nv_dict.iteritems():
1026       if inst.name not in res_missing:
1027         res_missing[inst.name] = []
1028       res_missing[inst.name].append(key)
1029
1030     return result
1031
1032
1033 class LURenameCluster(LogicalUnit):
1034   """Rename the cluster.
1035
1036   """
1037   HPATH = "cluster-rename"
1038   HTYPE = constants.HTYPE_CLUSTER
1039   _OP_REQP = ["name"]
1040
1041   def BuildHooksEnv(self):
1042     """Build hooks env.
1043
1044     """
1045     env = {
1046       "OP_TARGET": self.sstore.GetClusterName(),
1047       "NEW_NAME": self.op.name,
1048       }
1049     mn = self.sstore.GetMasterNode()
1050     return env, [mn], [mn]
1051
1052   def CheckPrereq(self):
1053     """Verify that the passed name is a valid one.
1054
1055     """
1056     hostname = utils.HostInfo(self.op.name)
1057
1058     new_name = hostname.name
1059     self.ip = new_ip = hostname.ip
1060     old_name = self.sstore.GetClusterName()
1061     old_ip = self.sstore.GetMasterIP()
1062     if new_name == old_name and new_ip == old_ip:
1063       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1064                                  " cluster has changed")
1065     if new_ip != old_ip:
1066       result = utils.RunCmd(["fping", "-q", new_ip])
1067       if not result.failed:
1068         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1069                                    " reachable on the network. Aborting." %
1070                                    new_ip)
1071
1072     self.op.name = new_name
1073
1074   def Exec(self, feedback_fn):
1075     """Rename the cluster.
1076
1077     """
1078     clustername = self.op.name
1079     ip = self.ip
1080     ss = self.sstore
1081
1082     # shutdown the master IP
1083     master = ss.GetMasterNode()
1084     if not rpc.call_node_stop_master(master):
1085       raise errors.OpExecError("Could not disable the master role")
1086
1087     try:
1088       # modify the sstore
1089       ss.SetKey(ss.SS_MASTER_IP, ip)
1090       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1091
1092       # Distribute updated ss config to all nodes
1093       myself = self.cfg.GetNodeInfo(master)
1094       dist_nodes = self.cfg.GetNodeList()
1095       if myself.name in dist_nodes:
1096         dist_nodes.remove(myself.name)
1097
1098       logger.Debug("Copying updated ssconf data to all nodes")
1099       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1100         fname = ss.KeyToFilename(keyname)
1101         result = rpc.call_upload_file(dist_nodes, fname)
1102         for to_node in dist_nodes:
1103           if not result[to_node]:
1104             logger.Error("copy of file %s to node %s failed" %
1105                          (fname, to_node))
1106     finally:
1107       if not rpc.call_node_start_master(master):
1108         logger.Error("Could not re-enable the master role on the master,"
1109                      " please restart manually.")
1110
1111
1112 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1113   """Sleep and poll for an instance's disk to sync.
1114
1115   """
1116   if not instance.disks:
1117     return True
1118
1119   if not oneshot:
1120     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1121
1122   node = instance.primary_node
1123
1124   for dev in instance.disks:
1125     cfgw.SetDiskID(dev, node)
1126
1127   retries = 0
1128   while True:
1129     max_time = 0
1130     done = True
1131     cumul_degraded = False
1132     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1133     if not rstats:
1134       proc.LogWarning("Can't get any data from node %s" % node)
1135       retries += 1
1136       if retries >= 10:
1137         raise errors.RemoteError("Can't contact node %s for mirror data,"
1138                                  " aborting." % node)
1139       time.sleep(6)
1140       continue
1141     retries = 0
1142     for i in range(len(rstats)):
1143       mstat = rstats[i]
1144       if mstat is None:
1145         proc.LogWarning("Can't compute data for node %s/%s" %
1146                         (node, instance.disks[i].iv_name))
1147         continue
1148       # we ignore the ldisk parameter
1149       perc_done, est_time, is_degraded, _ = mstat
1150       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1151       if perc_done is not None:
1152         done = False
1153         if est_time is not None:
1154           rem_time = "%d estimated seconds remaining" % est_time
1155           max_time = est_time
1156         else:
1157           rem_time = "no time estimate"
1158         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1159                      (instance.disks[i].iv_name, perc_done, rem_time))
1160     if done or oneshot:
1161       break
1162
1163     if unlock:
1164       utils.Unlock('cmd')
1165     try:
1166       time.sleep(min(60, max_time))
1167     finally:
1168       if unlock:
1169         utils.Lock('cmd')
1170
1171   if done:
1172     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1173   return not cumul_degraded
1174
1175
1176 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1177   """Check that mirrors are not degraded.
1178
1179   The ldisk parameter, if True, will change the test from the
1180   is_degraded attribute (which represents overall non-ok status for
1181   the device(s)) to the ldisk (representing the local storage status).
1182
1183   """
1184   cfgw.SetDiskID(dev, node)
1185   if ldisk:
1186     idx = 6
1187   else:
1188     idx = 5
1189
1190   result = True
1191   if on_primary or dev.AssembleOnSecondary():
1192     rstats = rpc.call_blockdev_find(node, dev)
1193     if not rstats:
1194       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1195       result = False
1196     else:
1197       result = result and (not rstats[idx])
1198   if dev.children:
1199     for child in dev.children:
1200       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1201
1202   return result
1203
1204
1205 class LUDiagnoseOS(NoHooksLU):
1206   """Logical unit for OS diagnose/query.
1207
1208   """
1209   _OP_REQP = []
1210
1211   def CheckPrereq(self):
1212     """Check prerequisites.
1213
1214     This always succeeds, since this is a pure query LU.
1215
1216     """
1217     return
1218
1219   def Exec(self, feedback_fn):
1220     """Compute the list of OSes.
1221
1222     """
1223     node_list = self.cfg.GetNodeList()
1224     node_data = rpc.call_os_diagnose(node_list)
1225     if node_data == False:
1226       raise errors.OpExecError("Can't gather the list of OSes")
1227     return node_data
1228
1229
1230 class LURemoveNode(LogicalUnit):
1231   """Logical unit for removing a node.
1232
1233   """
1234   HPATH = "node-remove"
1235   HTYPE = constants.HTYPE_NODE
1236   _OP_REQP = ["node_name"]
1237
1238   def BuildHooksEnv(self):
1239     """Build hooks env.
1240
1241     This doesn't run on the target node in the pre phase as a failed
1242     node would not allows itself to run.
1243
1244     """
1245     env = {
1246       "OP_TARGET": self.op.node_name,
1247       "NODE_NAME": self.op.node_name,
1248       }
1249     all_nodes = self.cfg.GetNodeList()
1250     all_nodes.remove(self.op.node_name)
1251     return env, all_nodes, all_nodes
1252
1253   def CheckPrereq(self):
1254     """Check prerequisites.
1255
1256     This checks:
1257      - the node exists in the configuration
1258      - it does not have primary or secondary instances
1259      - it's not the master
1260
1261     Any errors are signalled by raising errors.OpPrereqError.
1262
1263     """
1264     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1265     if node is None:
1266       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1267
1268     instance_list = self.cfg.GetInstanceList()
1269
1270     masternode = self.sstore.GetMasterNode()
1271     if node.name == masternode:
1272       raise errors.OpPrereqError("Node is the master node,"
1273                                  " you need to failover first.")
1274
1275     for instance_name in instance_list:
1276       instance = self.cfg.GetInstanceInfo(instance_name)
1277       if node.name == instance.primary_node:
1278         raise errors.OpPrereqError("Instance %s still running on the node,"
1279                                    " please remove first." % instance_name)
1280       if node.name in instance.secondary_nodes:
1281         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1282                                    " please remove first." % instance_name)
1283     self.op.node_name = node.name
1284     self.node = node
1285
1286   def Exec(self, feedback_fn):
1287     """Removes the node from the cluster.
1288
1289     """
1290     node = self.node
1291     logger.Info("stopping the node daemon and removing configs from node %s" %
1292                 node.name)
1293
1294     rpc.call_node_leave_cluster(node.name)
1295
1296     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1297
1298     logger.Info("Removing node %s from config" % node.name)
1299
1300     self.cfg.RemoveNode(node.name)
1301
1302     _RemoveHostFromEtcHosts(node.name)
1303
1304
1305 class LUQueryNodes(NoHooksLU):
1306   """Logical unit for querying nodes.
1307
1308   """
1309   _OP_REQP = ["output_fields", "names"]
1310
1311   def CheckPrereq(self):
1312     """Check prerequisites.
1313
1314     This checks that the fields required are valid output fields.
1315
1316     """
1317     self.dynamic_fields = frozenset(["dtotal", "dfree",
1318                                      "mtotal", "mnode", "mfree",
1319                                      "bootid"])
1320
1321     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1322                                "pinst_list", "sinst_list",
1323                                "pip", "sip"],
1324                        dynamic=self.dynamic_fields,
1325                        selected=self.op.output_fields)
1326
1327     self.wanted = _GetWantedNodes(self, self.op.names)
1328
1329   def Exec(self, feedback_fn):
1330     """Computes the list of nodes and their attributes.
1331
1332     """
1333     nodenames = self.wanted
1334     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1335
1336     # begin data gathering
1337
1338     if self.dynamic_fields.intersection(self.op.output_fields):
1339       live_data = {}
1340       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1341       for name in nodenames:
1342         nodeinfo = node_data.get(name, None)
1343         if nodeinfo:
1344           live_data[name] = {
1345             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1346             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1347             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1348             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1349             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1350             "bootid": nodeinfo['bootid'],
1351             }
1352         else:
1353           live_data[name] = {}
1354     else:
1355       live_data = dict.fromkeys(nodenames, {})
1356
1357     node_to_primary = dict([(name, set()) for name in nodenames])
1358     node_to_secondary = dict([(name, set()) for name in nodenames])
1359
1360     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1361                              "sinst_cnt", "sinst_list"))
1362     if inst_fields & frozenset(self.op.output_fields):
1363       instancelist = self.cfg.GetInstanceList()
1364
1365       for instance_name in instancelist:
1366         inst = self.cfg.GetInstanceInfo(instance_name)
1367         if inst.primary_node in node_to_primary:
1368           node_to_primary[inst.primary_node].add(inst.name)
1369         for secnode in inst.secondary_nodes:
1370           if secnode in node_to_secondary:
1371             node_to_secondary[secnode].add(inst.name)
1372
1373     # end data gathering
1374
1375     output = []
1376     for node in nodelist:
1377       node_output = []
1378       for field in self.op.output_fields:
1379         if field == "name":
1380           val = node.name
1381         elif field == "pinst_list":
1382           val = list(node_to_primary[node.name])
1383         elif field == "sinst_list":
1384           val = list(node_to_secondary[node.name])
1385         elif field == "pinst_cnt":
1386           val = len(node_to_primary[node.name])
1387         elif field == "sinst_cnt":
1388           val = len(node_to_secondary[node.name])
1389         elif field == "pip":
1390           val = node.primary_ip
1391         elif field == "sip":
1392           val = node.secondary_ip
1393         elif field in self.dynamic_fields:
1394           val = live_data[node.name].get(field, None)
1395         else:
1396           raise errors.ParameterError(field)
1397         node_output.append(val)
1398       output.append(node_output)
1399
1400     return output
1401
1402
1403 class LUQueryNodeVolumes(NoHooksLU):
1404   """Logical unit for getting volumes on node(s).
1405
1406   """
1407   _OP_REQP = ["nodes", "output_fields"]
1408
1409   def CheckPrereq(self):
1410     """Check prerequisites.
1411
1412     This checks that the fields required are valid output fields.
1413
1414     """
1415     self.nodes = _GetWantedNodes(self, self.op.nodes)
1416
1417     _CheckOutputFields(static=["node"],
1418                        dynamic=["phys", "vg", "name", "size", "instance"],
1419                        selected=self.op.output_fields)
1420
1421
1422   def Exec(self, feedback_fn):
1423     """Computes the list of nodes and their attributes.
1424
1425     """
1426     nodenames = self.nodes
1427     volumes = rpc.call_node_volumes(nodenames)
1428
1429     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1430              in self.cfg.GetInstanceList()]
1431
1432     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1433
1434     output = []
1435     for node in nodenames:
1436       if node not in volumes or not volumes[node]:
1437         continue
1438
1439       node_vols = volumes[node][:]
1440       node_vols.sort(key=lambda vol: vol['dev'])
1441
1442       for vol in node_vols:
1443         node_output = []
1444         for field in self.op.output_fields:
1445           if field == "node":
1446             val = node
1447           elif field == "phys":
1448             val = vol['dev']
1449           elif field == "vg":
1450             val = vol['vg']
1451           elif field == "name":
1452             val = vol['name']
1453           elif field == "size":
1454             val = int(float(vol['size']))
1455           elif field == "instance":
1456             for inst in ilist:
1457               if node not in lv_by_node[inst]:
1458                 continue
1459               if vol['name'] in lv_by_node[inst][node]:
1460                 val = inst.name
1461                 break
1462             else:
1463               val = '-'
1464           else:
1465             raise errors.ParameterError(field)
1466           node_output.append(str(val))
1467
1468         output.append(node_output)
1469
1470     return output
1471
1472
1473 class LUAddNode(LogicalUnit):
1474   """Logical unit for adding node to the cluster.
1475
1476   """
1477   HPATH = "node-add"
1478   HTYPE = constants.HTYPE_NODE
1479   _OP_REQP = ["node_name"]
1480
1481   def BuildHooksEnv(self):
1482     """Build hooks env.
1483
1484     This will run on all nodes before, and on all nodes + the new node after.
1485
1486     """
1487     env = {
1488       "OP_TARGET": self.op.node_name,
1489       "NODE_NAME": self.op.node_name,
1490       "NODE_PIP": self.op.primary_ip,
1491       "NODE_SIP": self.op.secondary_ip,
1492       }
1493     nodes_0 = self.cfg.GetNodeList()
1494     nodes_1 = nodes_0 + [self.op.node_name, ]
1495     return env, nodes_0, nodes_1
1496
1497   def CheckPrereq(self):
1498     """Check prerequisites.
1499
1500     This checks:
1501      - the new node is not already in the config
1502      - it is resolvable
1503      - its parameters (single/dual homed) matches the cluster
1504
1505     Any errors are signalled by raising errors.OpPrereqError.
1506
1507     """
1508     node_name = self.op.node_name
1509     cfg = self.cfg
1510
1511     dns_data = utils.HostInfo(node_name)
1512
1513     node = dns_data.name
1514     primary_ip = self.op.primary_ip = dns_data.ip
1515     secondary_ip = getattr(self.op, "secondary_ip", None)
1516     if secondary_ip is None:
1517       secondary_ip = primary_ip
1518     if not utils.IsValidIP(secondary_ip):
1519       raise errors.OpPrereqError("Invalid secondary IP given")
1520     self.op.secondary_ip = secondary_ip
1521     node_list = cfg.GetNodeList()
1522     if node in node_list:
1523       raise errors.OpPrereqError("Node %s is already in the configuration"
1524                                  % node)
1525
1526     for existing_node_name in node_list:
1527       existing_node = cfg.GetNodeInfo(existing_node_name)
1528       if (existing_node.primary_ip == primary_ip or
1529           existing_node.secondary_ip == primary_ip or
1530           existing_node.primary_ip == secondary_ip or
1531           existing_node.secondary_ip == secondary_ip):
1532         raise errors.OpPrereqError("New node ip address(es) conflict with"
1533                                    " existing node %s" % existing_node.name)
1534
1535     # check that the type of the node (single versus dual homed) is the
1536     # same as for the master
1537     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1538     master_singlehomed = myself.secondary_ip == myself.primary_ip
1539     newbie_singlehomed = secondary_ip == primary_ip
1540     if master_singlehomed != newbie_singlehomed:
1541       if master_singlehomed:
1542         raise errors.OpPrereqError("The master has no private ip but the"
1543                                    " new node has one")
1544       else:
1545         raise errors.OpPrereqError("The master has a private ip but the"
1546                                    " new node doesn't have one")
1547
1548     # checks reachablity
1549     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1550       raise errors.OpPrereqError("Node not reachable by ping")
1551
1552     if not newbie_singlehomed:
1553       # check reachability from my secondary ip to newbie's secondary ip
1554       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1555                            source=myself.secondary_ip):
1556         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1557                                    " based ping to noded port")
1558
1559     self.new_node = objects.Node(name=node,
1560                                  primary_ip=primary_ip,
1561                                  secondary_ip=secondary_ip)
1562
1563     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1564       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1565         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1566                                    constants.VNC_PASSWORD_FILE)
1567
1568   def Exec(self, feedback_fn):
1569     """Adds the new node to the cluster.
1570
1571     """
1572     new_node = self.new_node
1573     node = new_node.name
1574
1575     # set up inter-node password and certificate and restarts the node daemon
1576     gntpass = self.sstore.GetNodeDaemonPassword()
1577     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1578       raise errors.OpExecError("ganeti password corruption detected")
1579     f = open(constants.SSL_CERT_FILE)
1580     try:
1581       gntpem = f.read(8192)
1582     finally:
1583       f.close()
1584     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1585     # so we use this to detect an invalid certificate; as long as the
1586     # cert doesn't contain this, the here-document will be correctly
1587     # parsed by the shell sequence below
1588     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1589       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1590     if not gntpem.endswith("\n"):
1591       raise errors.OpExecError("PEM must end with newline")
1592     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1593
1594     # and then connect with ssh to set password and start ganeti-noded
1595     # note that all the below variables are sanitized at this point,
1596     # either by being constants or by the checks above
1597     ss = self.sstore
1598     mycommand = ("umask 077 && "
1599                  "echo '%s' > '%s' && "
1600                  "cat > '%s' << '!EOF.' && \n"
1601                  "%s!EOF.\n%s restart" %
1602                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1603                   constants.SSL_CERT_FILE, gntpem,
1604                   constants.NODE_INITD_SCRIPT))
1605
1606     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1607     if result.failed:
1608       raise errors.OpExecError("Remote command on node %s, error: %s,"
1609                                " output: %s" %
1610                                (node, result.fail_reason, result.output))
1611
1612     # check connectivity
1613     time.sleep(4)
1614
1615     result = rpc.call_version([node])[node]
1616     if result:
1617       if constants.PROTOCOL_VERSION == result:
1618         logger.Info("communication to node %s fine, sw version %s match" %
1619                     (node, result))
1620       else:
1621         raise errors.OpExecError("Version mismatch master version %s,"
1622                                  " node version %s" %
1623                                  (constants.PROTOCOL_VERSION, result))
1624     else:
1625       raise errors.OpExecError("Cannot get version from the new node")
1626
1627     # setup ssh on node
1628     logger.Info("copy ssh key to node %s" % node)
1629     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1630     keyarray = []
1631     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1632                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1633                 priv_key, pub_key]
1634
1635     for i in keyfiles:
1636       f = open(i, 'r')
1637       try:
1638         keyarray.append(f.read())
1639       finally:
1640         f.close()
1641
1642     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1643                                keyarray[3], keyarray[4], keyarray[5])
1644
1645     if not result:
1646       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1647
1648     # Add node to our /etc/hosts, and add key to known_hosts
1649     _AddHostToEtcHosts(new_node.name)
1650
1651     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1652                       self.cfg.GetHostKey())
1653
1654     if new_node.secondary_ip != new_node.primary_ip:
1655       if not rpc.call_node_tcp_ping(new_node.name,
1656                                     constants.LOCALHOST_IP_ADDRESS,
1657                                     new_node.secondary_ip,
1658                                     constants.DEFAULT_NODED_PORT,
1659                                     10, False):
1660         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1661                                  " you gave (%s). Please fix and re-run this"
1662                                  " command." % new_node.secondary_ip)
1663
1664     success, msg = ssh.VerifyNodeHostname(node)
1665     if not success:
1666       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1667                                " than the one the resolver gives: %s."
1668                                " Please fix and re-run this command." %
1669                                (node, msg))
1670
1671     # Distribute updated /etc/hosts and known_hosts to all nodes,
1672     # including the node just added
1673     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1674     dist_nodes = self.cfg.GetNodeList() + [node]
1675     if myself.name in dist_nodes:
1676       dist_nodes.remove(myself.name)
1677
1678     logger.Debug("Copying hosts and known_hosts to all nodes")
1679     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1680       result = rpc.call_upload_file(dist_nodes, fname)
1681       for to_node in dist_nodes:
1682         if not result[to_node]:
1683           logger.Error("copy of file %s to node %s failed" %
1684                        (fname, to_node))
1685
1686     to_copy = ss.GetFileList()
1687     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1688       to_copy.append(constants.VNC_PASSWORD_FILE)
1689     for fname in to_copy:
1690       if not ssh.CopyFileToNode(node, fname):
1691         logger.Error("could not copy file %s to node %s" % (fname, node))
1692
1693     logger.Info("adding node %s to cluster.conf" % node)
1694     self.cfg.AddNode(new_node)
1695
1696
1697 class LUMasterFailover(LogicalUnit):
1698   """Failover the master node to the current node.
1699
1700   This is a special LU in that it must run on a non-master node.
1701
1702   """
1703   HPATH = "master-failover"
1704   HTYPE = constants.HTYPE_CLUSTER
1705   REQ_MASTER = False
1706   _OP_REQP = []
1707
1708   def BuildHooksEnv(self):
1709     """Build hooks env.
1710
1711     This will run on the new master only in the pre phase, and on all
1712     the nodes in the post phase.
1713
1714     """
1715     env = {
1716       "OP_TARGET": self.new_master,
1717       "NEW_MASTER": self.new_master,
1718       "OLD_MASTER": self.old_master,
1719       }
1720     return env, [self.new_master], self.cfg.GetNodeList()
1721
1722   def CheckPrereq(self):
1723     """Check prerequisites.
1724
1725     This checks that we are not already the master.
1726
1727     """
1728     self.new_master = utils.HostInfo().name
1729     self.old_master = self.sstore.GetMasterNode()
1730
1731     if self.old_master == self.new_master:
1732       raise errors.OpPrereqError("This commands must be run on the node"
1733                                  " where you want the new master to be."
1734                                  " %s is already the master" %
1735                                  self.old_master)
1736
1737   def Exec(self, feedback_fn):
1738     """Failover the master node.
1739
1740     This command, when run on a non-master node, will cause the current
1741     master to cease being master, and the non-master to become new
1742     master.
1743
1744     """
1745     #TODO: do not rely on gethostname returning the FQDN
1746     logger.Info("setting master to %s, old master: %s" %
1747                 (self.new_master, self.old_master))
1748
1749     if not rpc.call_node_stop_master(self.old_master):
1750       logger.Error("could disable the master role on the old master"
1751                    " %s, please disable manually" % self.old_master)
1752
1753     ss = self.sstore
1754     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1755     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1756                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1757       logger.Error("could not distribute the new simple store master file"
1758                    " to the other nodes, please check.")
1759
1760     if not rpc.call_node_start_master(self.new_master):
1761       logger.Error("could not start the master role on the new master"
1762                    " %s, please check" % self.new_master)
1763       feedback_fn("Error in activating the master IP on the new master,"
1764                   " please fix manually.")
1765
1766
1767
1768 class LUQueryClusterInfo(NoHooksLU):
1769   """Query cluster configuration.
1770
1771   """
1772   _OP_REQP = []
1773   REQ_MASTER = False
1774
1775   def CheckPrereq(self):
1776     """No prerequsites needed for this LU.
1777
1778     """
1779     pass
1780
1781   def Exec(self, feedback_fn):
1782     """Return cluster config.
1783
1784     """
1785     result = {
1786       "name": self.sstore.GetClusterName(),
1787       "software_version": constants.RELEASE_VERSION,
1788       "protocol_version": constants.PROTOCOL_VERSION,
1789       "config_version": constants.CONFIG_VERSION,
1790       "os_api_version": constants.OS_API_VERSION,
1791       "export_version": constants.EXPORT_VERSION,
1792       "master": self.sstore.GetMasterNode(),
1793       "architecture": (platform.architecture()[0], platform.machine()),
1794       }
1795
1796     return result
1797
1798
1799 class LUClusterCopyFile(NoHooksLU):
1800   """Copy file to cluster.
1801
1802   """
1803   _OP_REQP = ["nodes", "filename"]
1804
1805   def CheckPrereq(self):
1806     """Check prerequisites.
1807
1808     It should check that the named file exists and that the given list
1809     of nodes is valid.
1810
1811     """
1812     if not os.path.exists(self.op.filename):
1813       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1814
1815     self.nodes = _GetWantedNodes(self, self.op.nodes)
1816
1817   def Exec(self, feedback_fn):
1818     """Copy a file from master to some nodes.
1819
1820     Args:
1821       opts - class with options as members
1822       args - list containing a single element, the file name
1823     Opts used:
1824       nodes - list containing the name of target nodes; if empty, all nodes
1825
1826     """
1827     filename = self.op.filename
1828
1829     myname = utils.HostInfo().name
1830
1831     for node in self.nodes:
1832       if node == myname:
1833         continue
1834       if not ssh.CopyFileToNode(node, filename):
1835         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1836
1837
1838 class LUDumpClusterConfig(NoHooksLU):
1839   """Return a text-representation of the cluster-config.
1840
1841   """
1842   _OP_REQP = []
1843
1844   def CheckPrereq(self):
1845     """No prerequisites.
1846
1847     """
1848     pass
1849
1850   def Exec(self, feedback_fn):
1851     """Dump a representation of the cluster config to the standard output.
1852
1853     """
1854     return self.cfg.DumpConfig()
1855
1856
1857 class LURunClusterCommand(NoHooksLU):
1858   """Run a command on some nodes.
1859
1860   """
1861   _OP_REQP = ["command", "nodes"]
1862
1863   def CheckPrereq(self):
1864     """Check prerequisites.
1865
1866     It checks that the given list of nodes is valid.
1867
1868     """
1869     self.nodes = _GetWantedNodes(self, self.op.nodes)
1870
1871   def Exec(self, feedback_fn):
1872     """Run a command on some nodes.
1873
1874     """
1875     # put the master at the end of the nodes list
1876     master_node = self.sstore.GetMasterNode()
1877     if master_node in self.nodes:
1878       self.nodes.remove(master_node)
1879       self.nodes.append(master_node)
1880
1881     data = []
1882     for node in self.nodes:
1883       result = ssh.SSHCall(node, "root", self.op.command)
1884       data.append((node, result.output, result.exit_code))
1885
1886     return data
1887
1888
1889 class LUActivateInstanceDisks(NoHooksLU):
1890   """Bring up an instance's disks.
1891
1892   """
1893   _OP_REQP = ["instance_name"]
1894
1895   def CheckPrereq(self):
1896     """Check prerequisites.
1897
1898     This checks that the instance is in the cluster.
1899
1900     """
1901     instance = self.cfg.GetInstanceInfo(
1902       self.cfg.ExpandInstanceName(self.op.instance_name))
1903     if instance is None:
1904       raise errors.OpPrereqError("Instance '%s' not known" %
1905                                  self.op.instance_name)
1906     self.instance = instance
1907
1908
1909   def Exec(self, feedback_fn):
1910     """Activate the disks.
1911
1912     """
1913     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1914     if not disks_ok:
1915       raise errors.OpExecError("Cannot activate block devices")
1916
1917     return disks_info
1918
1919
1920 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1921   """Prepare the block devices for an instance.
1922
1923   This sets up the block devices on all nodes.
1924
1925   Args:
1926     instance: a ganeti.objects.Instance object
1927     ignore_secondaries: if true, errors on secondary nodes won't result
1928                         in an error return from the function
1929
1930   Returns:
1931     false if the operation failed
1932     list of (host, instance_visible_name, node_visible_name) if the operation
1933          suceeded with the mapping from node devices to instance devices
1934   """
1935   device_info = []
1936   disks_ok = True
1937   iname = instance.name
1938   # With the two passes mechanism we try to reduce the window of
1939   # opportunity for the race condition of switching DRBD to primary
1940   # before handshaking occured, but we do not eliminate it
1941
1942   # The proper fix would be to wait (with some limits) until the
1943   # connection has been made and drbd transitions from WFConnection
1944   # into any other network-connected state (Connected, SyncTarget,
1945   # SyncSource, etc.)
1946
1947   # 1st pass, assemble on all nodes in secondary mode
1948   for inst_disk in instance.disks:
1949     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1950       cfg.SetDiskID(node_disk, node)
1951       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1952       if not result:
1953         logger.Error("could not prepare block device %s on node %s"
1954                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1955         if not ignore_secondaries:
1956           disks_ok = False
1957
1958   # FIXME: race condition on drbd migration to primary
1959
1960   # 2nd pass, do only the primary node
1961   for inst_disk in instance.disks:
1962     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1963       if node != instance.primary_node:
1964         continue
1965       cfg.SetDiskID(node_disk, node)
1966       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1967       if not result:
1968         logger.Error("could not prepare block device %s on node %s"
1969                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1970         disks_ok = False
1971     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1972
1973   # leave the disks configured for the primary node
1974   # this is a workaround that would be fixed better by
1975   # improving the logical/physical id handling
1976   for disk in instance.disks:
1977     cfg.SetDiskID(disk, instance.primary_node)
1978
1979   return disks_ok, device_info
1980
1981
1982 def _StartInstanceDisks(cfg, instance, force):
1983   """Start the disks of an instance.
1984
1985   """
1986   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1987                                            ignore_secondaries=force)
1988   if not disks_ok:
1989     _ShutdownInstanceDisks(instance, cfg)
1990     if force is not None and not force:
1991       logger.Error("If the message above refers to a secondary node,"
1992                    " you can retry the operation using '--force'.")
1993     raise errors.OpExecError("Disk consistency error")
1994
1995
1996 class LUDeactivateInstanceDisks(NoHooksLU):
1997   """Shutdown an instance's disks.
1998
1999   """
2000   _OP_REQP = ["instance_name"]
2001
2002   def CheckPrereq(self):
2003     """Check prerequisites.
2004
2005     This checks that the instance is in the cluster.
2006
2007     """
2008     instance = self.cfg.GetInstanceInfo(
2009       self.cfg.ExpandInstanceName(self.op.instance_name))
2010     if instance is None:
2011       raise errors.OpPrereqError("Instance '%s' not known" %
2012                                  self.op.instance_name)
2013     self.instance = instance
2014
2015   def Exec(self, feedback_fn):
2016     """Deactivate the disks
2017
2018     """
2019     instance = self.instance
2020     ins_l = rpc.call_instance_list([instance.primary_node])
2021     ins_l = ins_l[instance.primary_node]
2022     if not type(ins_l) is list:
2023       raise errors.OpExecError("Can't contact node '%s'" %
2024                                instance.primary_node)
2025
2026     if self.instance.name in ins_l:
2027       raise errors.OpExecError("Instance is running, can't shutdown"
2028                                " block devices.")
2029
2030     _ShutdownInstanceDisks(instance, self.cfg)
2031
2032
2033 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2034   """Shutdown block devices of an instance.
2035
2036   This does the shutdown on all nodes of the instance.
2037
2038   If the ignore_primary is false, errors on the primary node are
2039   ignored.
2040
2041   """
2042   result = True
2043   for disk in instance.disks:
2044     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2045       cfg.SetDiskID(top_disk, node)
2046       if not rpc.call_blockdev_shutdown(node, top_disk):
2047         logger.Error("could not shutdown block device %s on node %s" %
2048                      (disk.iv_name, node))
2049         if not ignore_primary or node != instance.primary_node:
2050           result = False
2051   return result
2052
2053
2054 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2055   """Checks if a node has enough free memory.
2056
2057   This function check if a given node has the needed amount of free
2058   memory. In case the node has less memory or we cannot get the
2059   information from the node, this function raise an OpPrereqError
2060   exception.
2061
2062   Args:
2063     - cfg: a ConfigWriter instance
2064     - node: the node name
2065     - reason: string to use in the error message
2066     - requested: the amount of memory in MiB
2067
2068   """
2069   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2070   if not nodeinfo or not isinstance(nodeinfo, dict):
2071     raise errors.OpPrereqError("Could not contact node %s for resource"
2072                              " information" % (node,))
2073
2074   free_mem = nodeinfo[node].get('memory_free')
2075   if not isinstance(free_mem, int):
2076     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2077                              " was '%s'" % (node, free_mem))
2078   if requested > free_mem:
2079     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2080                              " needed %s MiB, available %s MiB" %
2081                              (node, reason, requested, free_mem))
2082
2083
2084 class LUStartupInstance(LogicalUnit):
2085   """Starts an instance.
2086
2087   """
2088   HPATH = "instance-start"
2089   HTYPE = constants.HTYPE_INSTANCE
2090   _OP_REQP = ["instance_name", "force"]
2091
2092   def BuildHooksEnv(self):
2093     """Build hooks env.
2094
2095     This runs on master, primary and secondary nodes of the instance.
2096
2097     """
2098     env = {
2099       "FORCE": self.op.force,
2100       }
2101     env.update(_BuildInstanceHookEnvByObject(self.instance))
2102     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2103           list(self.instance.secondary_nodes))
2104     return env, nl, nl
2105
2106   def CheckPrereq(self):
2107     """Check prerequisites.
2108
2109     This checks that the instance is in the cluster.
2110
2111     """
2112     instance = self.cfg.GetInstanceInfo(
2113       self.cfg.ExpandInstanceName(self.op.instance_name))
2114     if instance is None:
2115       raise errors.OpPrereqError("Instance '%s' not known" %
2116                                  self.op.instance_name)
2117
2118     # check bridges existance
2119     _CheckInstanceBridgesExist(instance)
2120
2121     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2122                          "starting instance %s" % instance.name,
2123                          instance.memory)
2124
2125     self.instance = instance
2126     self.op.instance_name = instance.name
2127
2128   def Exec(self, feedback_fn):
2129     """Start the instance.
2130
2131     """
2132     instance = self.instance
2133     force = self.op.force
2134     extra_args = getattr(self.op, "extra_args", "")
2135
2136     self.cfg.MarkInstanceUp(instance.name)
2137
2138     node_current = instance.primary_node
2139
2140     _StartInstanceDisks(self.cfg, instance, force)
2141
2142     if not rpc.call_instance_start(node_current, instance, extra_args):
2143       _ShutdownInstanceDisks(instance, self.cfg)
2144       raise errors.OpExecError("Could not start instance")
2145
2146
2147 class LURebootInstance(LogicalUnit):
2148   """Reboot an instance.
2149
2150   """
2151   HPATH = "instance-reboot"
2152   HTYPE = constants.HTYPE_INSTANCE
2153   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2154
2155   def BuildHooksEnv(self):
2156     """Build hooks env.
2157
2158     This runs on master, primary and secondary nodes of the instance.
2159
2160     """
2161     env = {
2162       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2163       }
2164     env.update(_BuildInstanceHookEnvByObject(self.instance))
2165     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2166           list(self.instance.secondary_nodes))
2167     return env, nl, nl
2168
2169   def CheckPrereq(self):
2170     """Check prerequisites.
2171
2172     This checks that the instance is in the cluster.
2173
2174     """
2175     instance = self.cfg.GetInstanceInfo(
2176       self.cfg.ExpandInstanceName(self.op.instance_name))
2177     if instance is None:
2178       raise errors.OpPrereqError("Instance '%s' not known" %
2179                                  self.op.instance_name)
2180
2181     # check bridges existance
2182     _CheckInstanceBridgesExist(instance)
2183
2184     self.instance = instance
2185     self.op.instance_name = instance.name
2186
2187   def Exec(self, feedback_fn):
2188     """Reboot the instance.
2189
2190     """
2191     instance = self.instance
2192     ignore_secondaries = self.op.ignore_secondaries
2193     reboot_type = self.op.reboot_type
2194     extra_args = getattr(self.op, "extra_args", "")
2195
2196     node_current = instance.primary_node
2197
2198     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2199                            constants.INSTANCE_REBOOT_HARD,
2200                            constants.INSTANCE_REBOOT_FULL]:
2201       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2202                                   (constants.INSTANCE_REBOOT_SOFT,
2203                                    constants.INSTANCE_REBOOT_HARD,
2204                                    constants.INSTANCE_REBOOT_FULL))
2205
2206     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2207                        constants.INSTANCE_REBOOT_HARD]:
2208       if not rpc.call_instance_reboot(node_current, instance,
2209                                       reboot_type, extra_args):
2210         raise errors.OpExecError("Could not reboot instance")
2211     else:
2212       if not rpc.call_instance_shutdown(node_current, instance):
2213         raise errors.OpExecError("could not shutdown instance for full reboot")
2214       _ShutdownInstanceDisks(instance, self.cfg)
2215       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2216       if not rpc.call_instance_start(node_current, instance, extra_args):
2217         _ShutdownInstanceDisks(instance, self.cfg)
2218         raise errors.OpExecError("Could not start instance for full reboot")
2219
2220     self.cfg.MarkInstanceUp(instance.name)
2221
2222
2223 class LUShutdownInstance(LogicalUnit):
2224   """Shutdown an instance.
2225
2226   """
2227   HPATH = "instance-stop"
2228   HTYPE = constants.HTYPE_INSTANCE
2229   _OP_REQP = ["instance_name"]
2230
2231   def BuildHooksEnv(self):
2232     """Build hooks env.
2233
2234     This runs on master, primary and secondary nodes of the instance.
2235
2236     """
2237     env = _BuildInstanceHookEnvByObject(self.instance)
2238     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2239           list(self.instance.secondary_nodes))
2240     return env, nl, nl
2241
2242   def CheckPrereq(self):
2243     """Check prerequisites.
2244
2245     This checks that the instance is in the cluster.
2246
2247     """
2248     instance = self.cfg.GetInstanceInfo(
2249       self.cfg.ExpandInstanceName(self.op.instance_name))
2250     if instance is None:
2251       raise errors.OpPrereqError("Instance '%s' not known" %
2252                                  self.op.instance_name)
2253     self.instance = instance
2254
2255   def Exec(self, feedback_fn):
2256     """Shutdown the instance.
2257
2258     """
2259     instance = self.instance
2260     node_current = instance.primary_node
2261     self.cfg.MarkInstanceDown(instance.name)
2262     if not rpc.call_instance_shutdown(node_current, instance):
2263       logger.Error("could not shutdown instance")
2264
2265     _ShutdownInstanceDisks(instance, self.cfg)
2266
2267
2268 class LUReinstallInstance(LogicalUnit):
2269   """Reinstall an instance.
2270
2271   """
2272   HPATH = "instance-reinstall"
2273   HTYPE = constants.HTYPE_INSTANCE
2274   _OP_REQP = ["instance_name"]
2275
2276   def BuildHooksEnv(self):
2277     """Build hooks env.
2278
2279     This runs on master, primary and secondary nodes of the instance.
2280
2281     """
2282     env = _BuildInstanceHookEnvByObject(self.instance)
2283     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2284           list(self.instance.secondary_nodes))
2285     return env, nl, nl
2286
2287   def CheckPrereq(self):
2288     """Check prerequisites.
2289
2290     This checks that the instance is in the cluster and is not running.
2291
2292     """
2293     instance = self.cfg.GetInstanceInfo(
2294       self.cfg.ExpandInstanceName(self.op.instance_name))
2295     if instance is None:
2296       raise errors.OpPrereqError("Instance '%s' not known" %
2297                                  self.op.instance_name)
2298     if instance.disk_template == constants.DT_DISKLESS:
2299       raise errors.OpPrereqError("Instance '%s' has no disks" %
2300                                  self.op.instance_name)
2301     if instance.status != "down":
2302       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2303                                  self.op.instance_name)
2304     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2305     if remote_info:
2306       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2307                                  (self.op.instance_name,
2308                                   instance.primary_node))
2309
2310     self.op.os_type = getattr(self.op, "os_type", None)
2311     if self.op.os_type is not None:
2312       # OS verification
2313       pnode = self.cfg.GetNodeInfo(
2314         self.cfg.ExpandNodeName(instance.primary_node))
2315       if pnode is None:
2316         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2317                                    self.op.pnode)
2318       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2319       if not os_obj:
2320         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2321                                    " primary node"  % self.op.os_type)
2322
2323     self.instance = instance
2324
2325   def Exec(self, feedback_fn):
2326     """Reinstall the instance.
2327
2328     """
2329     inst = self.instance
2330
2331     if self.op.os_type is not None:
2332       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2333       inst.os = self.op.os_type
2334       self.cfg.AddInstance(inst)
2335
2336     _StartInstanceDisks(self.cfg, inst, None)
2337     try:
2338       feedback_fn("Running the instance OS create scripts...")
2339       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2340         raise errors.OpExecError("Could not install OS for instance %s"
2341                                  " on node %s" %
2342                                  (inst.name, inst.primary_node))
2343     finally:
2344       _ShutdownInstanceDisks(inst, self.cfg)
2345
2346
2347 class LURenameInstance(LogicalUnit):
2348   """Rename an instance.
2349
2350   """
2351   HPATH = "instance-rename"
2352   HTYPE = constants.HTYPE_INSTANCE
2353   _OP_REQP = ["instance_name", "new_name"]
2354
2355   def BuildHooksEnv(self):
2356     """Build hooks env.
2357
2358     This runs on master, primary and secondary nodes of the instance.
2359
2360     """
2361     env = _BuildInstanceHookEnvByObject(self.instance)
2362     env["INSTANCE_NEW_NAME"] = self.op.new_name
2363     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2364           list(self.instance.secondary_nodes))
2365     return env, nl, nl
2366
2367   def CheckPrereq(self):
2368     """Check prerequisites.
2369
2370     This checks that the instance is in the cluster and is not running.
2371
2372     """
2373     instance = self.cfg.GetInstanceInfo(
2374       self.cfg.ExpandInstanceName(self.op.instance_name))
2375     if instance is None:
2376       raise errors.OpPrereqError("Instance '%s' not known" %
2377                                  self.op.instance_name)
2378     if instance.status != "down":
2379       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2380                                  self.op.instance_name)
2381     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2382     if remote_info:
2383       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2384                                  (self.op.instance_name,
2385                                   instance.primary_node))
2386     self.instance = instance
2387
2388     # new name verification
2389     name_info = utils.HostInfo(self.op.new_name)
2390
2391     self.op.new_name = new_name = name_info.name
2392     instance_list = self.cfg.GetInstanceList()
2393     if new_name in instance_list:
2394       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2395                                  instance_name)
2396
2397     if not getattr(self.op, "ignore_ip", False):
2398       command = ["fping", "-q", name_info.ip]
2399       result = utils.RunCmd(command)
2400       if not result.failed:
2401         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2402                                    (name_info.ip, new_name))
2403
2404
2405   def Exec(self, feedback_fn):
2406     """Reinstall the instance.
2407
2408     """
2409     inst = self.instance
2410     old_name = inst.name
2411
2412     self.cfg.RenameInstance(inst.name, self.op.new_name)
2413
2414     # re-read the instance from the configuration after rename
2415     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2416
2417     _StartInstanceDisks(self.cfg, inst, None)
2418     try:
2419       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2420                                           "sda", "sdb"):
2421         msg = ("Could run OS rename script for instance %s on node %s (but the"
2422                " instance has been renamed in Ganeti)" %
2423                (inst.name, inst.primary_node))
2424         logger.Error(msg)
2425     finally:
2426       _ShutdownInstanceDisks(inst, self.cfg)
2427
2428
2429 class LURemoveInstance(LogicalUnit):
2430   """Remove an instance.
2431
2432   """
2433   HPATH = "instance-remove"
2434   HTYPE = constants.HTYPE_INSTANCE
2435   _OP_REQP = ["instance_name"]
2436
2437   def BuildHooksEnv(self):
2438     """Build hooks env.
2439
2440     This runs on master, primary and secondary nodes of the instance.
2441
2442     """
2443     env = _BuildInstanceHookEnvByObject(self.instance)
2444     nl = [self.sstore.GetMasterNode()]
2445     return env, nl, nl
2446
2447   def CheckPrereq(self):
2448     """Check prerequisites.
2449
2450     This checks that the instance is in the cluster.
2451
2452     """
2453     instance = self.cfg.GetInstanceInfo(
2454       self.cfg.ExpandInstanceName(self.op.instance_name))
2455     if instance is None:
2456       raise errors.OpPrereqError("Instance '%s' not known" %
2457                                  self.op.instance_name)
2458     self.instance = instance
2459
2460   def Exec(self, feedback_fn):
2461     """Remove the instance.
2462
2463     """
2464     instance = self.instance
2465     logger.Info("shutting down instance %s on node %s" %
2466                 (instance.name, instance.primary_node))
2467
2468     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2469       if self.op.ignore_failures:
2470         feedback_fn("Warning: can't shutdown instance")
2471       else:
2472         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2473                                  (instance.name, instance.primary_node))
2474
2475     logger.Info("removing block devices for instance %s" % instance.name)
2476
2477     if not _RemoveDisks(instance, self.cfg):
2478       if self.op.ignore_failures:
2479         feedback_fn("Warning: can't remove instance's disks")
2480       else:
2481         raise errors.OpExecError("Can't remove instance's disks")
2482
2483     logger.Info("removing instance %s out of cluster config" % instance.name)
2484
2485     self.cfg.RemoveInstance(instance.name)
2486
2487
2488 class LUQueryInstances(NoHooksLU):
2489   """Logical unit for querying instances.
2490
2491   """
2492   _OP_REQP = ["output_fields", "names"]
2493
2494   def CheckPrereq(self):
2495     """Check prerequisites.
2496
2497     This checks that the fields required are valid output fields.
2498
2499     """
2500     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2501     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2502                                "admin_state", "admin_ram",
2503                                "disk_template", "ip", "mac", "bridge",
2504                                "sda_size", "sdb_size", "vcpus"],
2505                        dynamic=self.dynamic_fields,
2506                        selected=self.op.output_fields)
2507
2508     self.wanted = _GetWantedInstances(self, self.op.names)
2509
2510   def Exec(self, feedback_fn):
2511     """Computes the list of nodes and their attributes.
2512
2513     """
2514     instance_names = self.wanted
2515     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2516                      in instance_names]
2517
2518     # begin data gathering
2519
2520     nodes = frozenset([inst.primary_node for inst in instance_list])
2521
2522     bad_nodes = []
2523     if self.dynamic_fields.intersection(self.op.output_fields):
2524       live_data = {}
2525       node_data = rpc.call_all_instances_info(nodes)
2526       for name in nodes:
2527         result = node_data[name]
2528         if result:
2529           live_data.update(result)
2530         elif result == False:
2531           bad_nodes.append(name)
2532         # else no instance is alive
2533     else:
2534       live_data = dict([(name, {}) for name in instance_names])
2535
2536     # end data gathering
2537
2538     output = []
2539     for instance in instance_list:
2540       iout = []
2541       for field in self.op.output_fields:
2542         if field == "name":
2543           val = instance.name
2544         elif field == "os":
2545           val = instance.os
2546         elif field == "pnode":
2547           val = instance.primary_node
2548         elif field == "snodes":
2549           val = list(instance.secondary_nodes)
2550         elif field == "admin_state":
2551           val = (instance.status != "down")
2552         elif field == "oper_state":
2553           if instance.primary_node in bad_nodes:
2554             val = None
2555           else:
2556             val = bool(live_data.get(instance.name))
2557         elif field == "status":
2558           if instance.primary_node in bad_nodes:
2559             val = "ERROR_nodedown"
2560           else:
2561             running = bool(live_data.get(instance.name))
2562             if running:
2563               if instance.status != "down":
2564                 val = "running"
2565               else:
2566                 val = "ERROR_up"
2567             else:
2568               if instance.status != "down":
2569                 val = "ERROR_down"
2570               else:
2571                 val = "ADMIN_down"
2572         elif field == "admin_ram":
2573           val = instance.memory
2574         elif field == "oper_ram":
2575           if instance.primary_node in bad_nodes:
2576             val = None
2577           elif instance.name in live_data:
2578             val = live_data[instance.name].get("memory", "?")
2579           else:
2580             val = "-"
2581         elif field == "disk_template":
2582           val = instance.disk_template
2583         elif field == "ip":
2584           val = instance.nics[0].ip
2585         elif field == "bridge":
2586           val = instance.nics[0].bridge
2587         elif field == "mac":
2588           val = instance.nics[0].mac
2589         elif field == "sda_size" or field == "sdb_size":
2590           disk = instance.FindDisk(field[:3])
2591           if disk is None:
2592             val = None
2593           else:
2594             val = disk.size
2595         elif field == "vcpus":
2596           val = instance.vcpus
2597         else:
2598           raise errors.ParameterError(field)
2599         iout.append(val)
2600       output.append(iout)
2601
2602     return output
2603
2604
2605 class LUFailoverInstance(LogicalUnit):
2606   """Failover an instance.
2607
2608   """
2609   HPATH = "instance-failover"
2610   HTYPE = constants.HTYPE_INSTANCE
2611   _OP_REQP = ["instance_name", "ignore_consistency"]
2612
2613   def BuildHooksEnv(self):
2614     """Build hooks env.
2615
2616     This runs on master, primary and secondary nodes of the instance.
2617
2618     """
2619     env = {
2620       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2621       }
2622     env.update(_BuildInstanceHookEnvByObject(self.instance))
2623     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2624     return env, nl, nl
2625
2626   def CheckPrereq(self):
2627     """Check prerequisites.
2628
2629     This checks that the instance is in the cluster.
2630
2631     """
2632     instance = self.cfg.GetInstanceInfo(
2633       self.cfg.ExpandInstanceName(self.op.instance_name))
2634     if instance is None:
2635       raise errors.OpPrereqError("Instance '%s' not known" %
2636                                  self.op.instance_name)
2637
2638     if instance.disk_template not in constants.DTS_NET_MIRROR:
2639       raise errors.OpPrereqError("Instance's disk layout is not"
2640                                  " network mirrored, cannot failover.")
2641
2642     secondary_nodes = instance.secondary_nodes
2643     if not secondary_nodes:
2644       raise errors.ProgrammerError("no secondary node but using "
2645                                    "DT_REMOTE_RAID1 template")
2646
2647     target_node = secondary_nodes[0]
2648     # check memory requirements on the secondary node
2649     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2650                          instance.name, instance.memory)
2651
2652     # check bridge existance
2653     brlist = [nic.bridge for nic in instance.nics]
2654     if not rpc.call_bridges_exist(target_node, brlist):
2655       raise errors.OpPrereqError("One or more target bridges %s does not"
2656                                  " exist on destination node '%s'" %
2657                                  (brlist, target_node))
2658
2659     self.instance = instance
2660
2661   def Exec(self, feedback_fn):
2662     """Failover an instance.
2663
2664     The failover is done by shutting it down on its present node and
2665     starting it on the secondary.
2666
2667     """
2668     instance = self.instance
2669
2670     source_node = instance.primary_node
2671     target_node = instance.secondary_nodes[0]
2672
2673     feedback_fn("* checking disk consistency between source and target")
2674     for dev in instance.disks:
2675       # for remote_raid1, these are md over drbd
2676       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2677         if instance.status == "up" and not self.op.ignore_consistency:
2678           raise errors.OpExecError("Disk %s is degraded on target node,"
2679                                    " aborting failover." % dev.iv_name)
2680
2681     feedback_fn("* shutting down instance on source node")
2682     logger.Info("Shutting down instance %s on node %s" %
2683                 (instance.name, source_node))
2684
2685     if not rpc.call_instance_shutdown(source_node, instance):
2686       if self.op.ignore_consistency:
2687         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2688                      " anyway. Please make sure node %s is down"  %
2689                      (instance.name, source_node, source_node))
2690       else:
2691         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2692                                  (instance.name, source_node))
2693
2694     feedback_fn("* deactivating the instance's disks on source node")
2695     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2696       raise errors.OpExecError("Can't shut down the instance's disks.")
2697
2698     instance.primary_node = target_node
2699     # distribute new instance config to the other nodes
2700     self.cfg.AddInstance(instance)
2701
2702     # Only start the instance if it's marked as up
2703     if instance.status == "up":
2704       feedback_fn("* activating the instance's disks on target node")
2705       logger.Info("Starting instance %s on node %s" %
2706                   (instance.name, target_node))
2707
2708       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2709                                                ignore_secondaries=True)
2710       if not disks_ok:
2711         _ShutdownInstanceDisks(instance, self.cfg)
2712         raise errors.OpExecError("Can't activate the instance's disks")
2713
2714       feedback_fn("* starting the instance on the target node")
2715       if not rpc.call_instance_start(target_node, instance, None):
2716         _ShutdownInstanceDisks(instance, self.cfg)
2717         raise errors.OpExecError("Could not start instance %s on node %s." %
2718                                  (instance.name, target_node))
2719
2720
2721 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2722   """Create a tree of block devices on the primary node.
2723
2724   This always creates all devices.
2725
2726   """
2727   if device.children:
2728     for child in device.children:
2729       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2730         return False
2731
2732   cfg.SetDiskID(device, node)
2733   new_id = rpc.call_blockdev_create(node, device, device.size,
2734                                     instance.name, True, info)
2735   if not new_id:
2736     return False
2737   if device.physical_id is None:
2738     device.physical_id = new_id
2739   return True
2740
2741
2742 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2743   """Create a tree of block devices on a secondary node.
2744
2745   If this device type has to be created on secondaries, create it and
2746   all its children.
2747
2748   If not, just recurse to children keeping the same 'force' value.
2749
2750   """
2751   if device.CreateOnSecondary():
2752     force = True
2753   if device.children:
2754     for child in device.children:
2755       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2756                                         child, force, info):
2757         return False
2758
2759   if not force:
2760     return True
2761   cfg.SetDiskID(device, node)
2762   new_id = rpc.call_blockdev_create(node, device, device.size,
2763                                     instance.name, False, info)
2764   if not new_id:
2765     return False
2766   if device.physical_id is None:
2767     device.physical_id = new_id
2768   return True
2769
2770
2771 def _GenerateUniqueNames(cfg, exts):
2772   """Generate a suitable LV name.
2773
2774   This will generate a logical volume name for the given instance.
2775
2776   """
2777   results = []
2778   for val in exts:
2779     new_id = cfg.GenerateUniqueID()
2780     results.append("%s%s" % (new_id, val))
2781   return results
2782
2783
2784 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2785   """Generate a drbd device complete with its children.
2786
2787   """
2788   port = cfg.AllocatePort()
2789   vgname = cfg.GetVGName()
2790   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2791                           logical_id=(vgname, names[0]))
2792   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2793                           logical_id=(vgname, names[1]))
2794   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2795                           logical_id = (primary, secondary, port),
2796                           children = [dev_data, dev_meta])
2797   return drbd_dev
2798
2799
2800 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2801   """Generate a drbd8 device complete with its children.
2802
2803   """
2804   port = cfg.AllocatePort()
2805   vgname = cfg.GetVGName()
2806   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2807                           logical_id=(vgname, names[0]))
2808   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2809                           logical_id=(vgname, names[1]))
2810   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2811                           logical_id = (primary, secondary, port),
2812                           children = [dev_data, dev_meta],
2813                           iv_name=iv_name)
2814   return drbd_dev
2815
2816 def _GenerateDiskTemplate(cfg, template_name,
2817                           instance_name, primary_node,
2818                           secondary_nodes, disk_sz, swap_sz):
2819   """Generate the entire disk layout for a given template type.
2820
2821   """
2822   #TODO: compute space requirements
2823
2824   vgname = cfg.GetVGName()
2825   if template_name == constants.DT_DISKLESS:
2826     disks = []
2827   elif template_name == constants.DT_PLAIN:
2828     if len(secondary_nodes) != 0:
2829       raise errors.ProgrammerError("Wrong template configuration")
2830
2831     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2832     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2833                            logical_id=(vgname, names[0]),
2834                            iv_name = "sda")
2835     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2836                            logical_id=(vgname, names[1]),
2837                            iv_name = "sdb")
2838     disks = [sda_dev, sdb_dev]
2839   elif template_name == constants.DT_LOCAL_RAID1:
2840     if len(secondary_nodes) != 0:
2841       raise errors.ProgrammerError("Wrong template configuration")
2842
2843
2844     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2845                                        ".sdb_m1", ".sdb_m2"])
2846     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2847                               logical_id=(vgname, names[0]))
2848     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2849                               logical_id=(vgname, names[1]))
2850     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2851                               size=disk_sz,
2852                               children = [sda_dev_m1, sda_dev_m2])
2853     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2854                               logical_id=(vgname, names[2]))
2855     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2856                               logical_id=(vgname, names[3]))
2857     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2858                               size=swap_sz,
2859                               children = [sdb_dev_m1, sdb_dev_m2])
2860     disks = [md_sda_dev, md_sdb_dev]
2861   elif template_name == constants.DT_REMOTE_RAID1:
2862     if len(secondary_nodes) != 1:
2863       raise errors.ProgrammerError("Wrong template configuration")
2864     remote_node = secondary_nodes[0]
2865     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2866                                        ".sdb_data", ".sdb_meta"])
2867     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2868                                          disk_sz, names[0:2])
2869     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2870                               children = [drbd_sda_dev], size=disk_sz)
2871     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2872                                          swap_sz, names[2:4])
2873     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2874                               children = [drbd_sdb_dev], size=swap_sz)
2875     disks = [md_sda_dev, md_sdb_dev]
2876   elif template_name == constants.DT_DRBD8:
2877     if len(secondary_nodes) != 1:
2878       raise errors.ProgrammerError("Wrong template configuration")
2879     remote_node = secondary_nodes[0]
2880     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2881                                        ".sdb_data", ".sdb_meta"])
2882     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2883                                          disk_sz, names[0:2], "sda")
2884     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2885                                          swap_sz, names[2:4], "sdb")
2886     disks = [drbd_sda_dev, drbd_sdb_dev]
2887   else:
2888     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2889   return disks
2890
2891
2892 def _GetInstanceInfoText(instance):
2893   """Compute that text that should be added to the disk's metadata.
2894
2895   """
2896   return "originstname+%s" % instance.name
2897
2898
2899 def _CreateDisks(cfg, instance):
2900   """Create all disks for an instance.
2901
2902   This abstracts away some work from AddInstance.
2903
2904   Args:
2905     instance: the instance object
2906
2907   Returns:
2908     True or False showing the success of the creation process
2909
2910   """
2911   info = _GetInstanceInfoText(instance)
2912
2913   for device in instance.disks:
2914     logger.Info("creating volume %s for instance %s" %
2915               (device.iv_name, instance.name))
2916     #HARDCODE
2917     for secondary_node in instance.secondary_nodes:
2918       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2919                                         device, False, info):
2920         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2921                      (device.iv_name, device, secondary_node))
2922         return False
2923     #HARDCODE
2924     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2925                                     instance, device, info):
2926       logger.Error("failed to create volume %s on primary!" %
2927                    device.iv_name)
2928       return False
2929   return True
2930
2931
2932 def _RemoveDisks(instance, cfg):
2933   """Remove all disks for an instance.
2934
2935   This abstracts away some work from `AddInstance()` and
2936   `RemoveInstance()`. Note that in case some of the devices couldn't
2937   be removed, the removal will continue with the other ones (compare
2938   with `_CreateDisks()`).
2939
2940   Args:
2941     instance: the instance object
2942
2943   Returns:
2944     True or False showing the success of the removal proces
2945
2946   """
2947   logger.Info("removing block devices for instance %s" % instance.name)
2948
2949   result = True
2950   for device in instance.disks:
2951     for node, disk in device.ComputeNodeTree(instance.primary_node):
2952       cfg.SetDiskID(disk, node)
2953       if not rpc.call_blockdev_remove(node, disk):
2954         logger.Error("could not remove block device %s on node %s,"
2955                      " continuing anyway" %
2956                      (device.iv_name, node))
2957         result = False
2958   return result
2959
2960
2961 class LUCreateInstance(LogicalUnit):
2962   """Create an instance.
2963
2964   """
2965   HPATH = "instance-add"
2966   HTYPE = constants.HTYPE_INSTANCE
2967   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2968               "disk_template", "swap_size", "mode", "start", "vcpus",
2969               "wait_for_sync", "ip_check", "mac"]
2970
2971   def BuildHooksEnv(self):
2972     """Build hooks env.
2973
2974     This runs on master, primary and secondary nodes of the instance.
2975
2976     """
2977     env = {
2978       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2979       "INSTANCE_DISK_SIZE": self.op.disk_size,
2980       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2981       "INSTANCE_ADD_MODE": self.op.mode,
2982       }
2983     if self.op.mode == constants.INSTANCE_IMPORT:
2984       env["INSTANCE_SRC_NODE"] = self.op.src_node
2985       env["INSTANCE_SRC_PATH"] = self.op.src_path
2986       env["INSTANCE_SRC_IMAGE"] = self.src_image
2987
2988     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2989       primary_node=self.op.pnode,
2990       secondary_nodes=self.secondaries,
2991       status=self.instance_status,
2992       os_type=self.op.os_type,
2993       memory=self.op.mem_size,
2994       vcpus=self.op.vcpus,
2995       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2996     ))
2997
2998     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2999           self.secondaries)
3000     return env, nl, nl
3001
3002
3003   def CheckPrereq(self):
3004     """Check prerequisites.
3005
3006     """
3007     for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
3008       if not hasattr(self.op, attr):
3009         setattr(self.op, attr, None)
3010
3011     if self.op.mode not in (constants.INSTANCE_CREATE,
3012                             constants.INSTANCE_IMPORT):
3013       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3014                                  self.op.mode)
3015
3016     if self.op.mode == constants.INSTANCE_IMPORT:
3017       src_node = getattr(self.op, "src_node", None)
3018       src_path = getattr(self.op, "src_path", None)
3019       if src_node is None or src_path is None:
3020         raise errors.OpPrereqError("Importing an instance requires source"
3021                                    " node and path options")
3022       src_node_full = self.cfg.ExpandNodeName(src_node)
3023       if src_node_full is None:
3024         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3025       self.op.src_node = src_node = src_node_full
3026
3027       if not os.path.isabs(src_path):
3028         raise errors.OpPrereqError("The source path must be absolute")
3029
3030       export_info = rpc.call_export_info(src_node, src_path)
3031
3032       if not export_info:
3033         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3034
3035       if not export_info.has_section(constants.INISECT_EXP):
3036         raise errors.ProgrammerError("Corrupted export config")
3037
3038       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3039       if (int(ei_version) != constants.EXPORT_VERSION):
3040         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3041                                    (ei_version, constants.EXPORT_VERSION))
3042
3043       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3044         raise errors.OpPrereqError("Can't import instance with more than"
3045                                    " one data disk")
3046
3047       # FIXME: are the old os-es, disk sizes, etc. useful?
3048       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3049       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3050                                                          'disk0_dump'))
3051       self.src_image = diskimage
3052     else: # INSTANCE_CREATE
3053       if getattr(self.op, "os_type", None) is None:
3054         raise errors.OpPrereqError("No guest OS specified")
3055
3056     # check primary node
3057     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3058     if pnode is None:
3059       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3060                                  self.op.pnode)
3061     self.op.pnode = pnode.name
3062     self.pnode = pnode
3063     self.secondaries = []
3064     # disk template and mirror node verification
3065     if self.op.disk_template not in constants.DISK_TEMPLATES:
3066       raise errors.OpPrereqError("Invalid disk template name")
3067
3068     if self.op.disk_template in constants.DTS_NET_MIRROR:
3069       if getattr(self.op, "snode", None) is None:
3070         raise errors.OpPrereqError("The networked disk templates need"
3071                                    " a mirror node")
3072
3073       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3074       if snode_name is None:
3075         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3076                                    self.op.snode)
3077       elif snode_name == pnode.name:
3078         raise errors.OpPrereqError("The secondary node cannot be"
3079                                    " the primary node.")
3080       self.secondaries.append(snode_name)
3081
3082     # Required free disk space as a function of disk and swap space
3083     req_size_dict = {
3084       constants.DT_DISKLESS: None,
3085       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3086       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
3087       # 256 MB are added for drbd metadata, 128MB for each drbd device
3088       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
3089       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3090     }
3091
3092     if self.op.disk_template not in req_size_dict:
3093       raise errors.ProgrammerError("Disk template '%s' size requirement"
3094                                    " is unknown" %  self.op.disk_template)
3095
3096     req_size = req_size_dict[self.op.disk_template]
3097
3098     # Check lv size requirements
3099     if req_size is not None:
3100       nodenames = [pnode.name] + self.secondaries
3101       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3102       for node in nodenames:
3103         info = nodeinfo.get(node, None)
3104         if not info:
3105           raise errors.OpPrereqError("Cannot get current information"
3106                                      " from node '%s'" % nodeinfo)
3107         vg_free = info.get('vg_free', None)
3108         if not isinstance(vg_free, int):
3109           raise errors.OpPrereqError("Can't compute free disk space on"
3110                                      " node %s" % node)
3111         if req_size > info['vg_free']:
3112           raise errors.OpPrereqError("Not enough disk space on target node %s."
3113                                      " %d MB available, %d MB required" %
3114                                      (node, info['vg_free'], req_size))
3115
3116     # os verification
3117     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3118     if not os_obj:
3119       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3120                                  " primary node"  % self.op.os_type)
3121
3122     if self.op.kernel_path == constants.VALUE_NONE:
3123       raise errors.OpPrereqError("Can't set instance kernel to none")
3124
3125     # instance verification
3126     hostname1 = utils.HostInfo(self.op.instance_name)
3127
3128     self.op.instance_name = instance_name = hostname1.name
3129     instance_list = self.cfg.GetInstanceList()
3130     if instance_name in instance_list:
3131       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3132                                  instance_name)
3133
3134     ip = getattr(self.op, "ip", None)
3135     if ip is None or ip.lower() == "none":
3136       inst_ip = None
3137     elif ip.lower() == "auto":
3138       inst_ip = hostname1.ip
3139     else:
3140       if not utils.IsValidIP(ip):
3141         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3142                                    " like a valid IP" % ip)
3143       inst_ip = ip
3144     self.inst_ip = inst_ip
3145
3146     if self.op.start and not self.op.ip_check:
3147       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3148                                  " adding an instance in start mode")
3149
3150     if self.op.ip_check:
3151       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3152         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3153                                    (hostname1.ip, instance_name))
3154
3155     # MAC address verification
3156     if self.op.mac != "auto":
3157       if not utils.IsValidMac(self.op.mac.lower()):
3158         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3159                                    self.op.mac)
3160
3161     # bridge verification
3162     bridge = getattr(self.op, "bridge", None)
3163     if bridge is None:
3164       self.op.bridge = self.cfg.GetDefBridge()
3165     else:
3166       self.op.bridge = bridge
3167
3168     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3169       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3170                                  " destination node '%s'" %
3171                                  (self.op.bridge, pnode.name))
3172
3173     # boot order verification
3174     if self.op.hvm_boot_order is not None:
3175       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3176         raise errors.OpPrereqError("invalid boot order specified,"
3177                                    " must be one or more of [acdn]")
3178
3179     if self.op.start:
3180       self.instance_status = 'up'
3181     else:
3182       self.instance_status = 'down'
3183
3184   def Exec(self, feedback_fn):
3185     """Create and add the instance to the cluster.
3186
3187     """
3188     instance = self.op.instance_name
3189     pnode_name = self.pnode.name
3190
3191     if self.op.mac == "auto":
3192       mac_address = self.cfg.GenerateMAC()
3193     else:
3194       mac_address = self.op.mac
3195
3196     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3197     if self.inst_ip is not None:
3198       nic.ip = self.inst_ip
3199
3200     ht_kind = self.sstore.GetHypervisorType()
3201     if ht_kind in constants.HTS_REQ_PORT:
3202       network_port = self.cfg.AllocatePort()
3203     else:
3204       network_port = None
3205
3206     disks = _GenerateDiskTemplate(self.cfg,
3207                                   self.op.disk_template,
3208                                   instance, pnode_name,
3209                                   self.secondaries, self.op.disk_size,
3210                                   self.op.swap_size)
3211
3212     iobj = objects.Instance(name=instance, os=self.op.os_type,
3213                             primary_node=pnode_name,
3214                             memory=self.op.mem_size,
3215                             vcpus=self.op.vcpus,
3216                             nics=[nic], disks=disks,
3217                             disk_template=self.op.disk_template,
3218                             status=self.instance_status,
3219                             network_port=network_port,
3220                             kernel_path=self.op.kernel_path,
3221                             initrd_path=self.op.initrd_path,
3222                             hvm_boot_order=self.op.hvm_boot_order,
3223                             )
3224
3225     feedback_fn("* creating instance disks...")
3226     if not _CreateDisks(self.cfg, iobj):
3227       _RemoveDisks(iobj, self.cfg)
3228       raise errors.OpExecError("Device creation failed, reverting...")
3229
3230     feedback_fn("adding instance %s to cluster config" % instance)
3231
3232     self.cfg.AddInstance(iobj)
3233
3234     if self.op.wait_for_sync:
3235       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3236     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3237       # make sure the disks are not degraded (still sync-ing is ok)
3238       time.sleep(15)
3239       feedback_fn("* checking mirrors status")
3240       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3241     else:
3242       disk_abort = False
3243
3244     if disk_abort:
3245       _RemoveDisks(iobj, self.cfg)
3246       self.cfg.RemoveInstance(iobj.name)
3247       raise errors.OpExecError("There are some degraded disks for"
3248                                " this instance")
3249
3250     feedback_fn("creating os for instance %s on node %s" %
3251                 (instance, pnode_name))
3252
3253     if iobj.disk_template != constants.DT_DISKLESS:
3254       if self.op.mode == constants.INSTANCE_CREATE:
3255         feedback_fn("* running the instance OS create scripts...")
3256         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3257           raise errors.OpExecError("could not add os for instance %s"
3258                                    " on node %s" %
3259                                    (instance, pnode_name))
3260
3261       elif self.op.mode == constants.INSTANCE_IMPORT:
3262         feedback_fn("* running the instance OS import scripts...")
3263         src_node = self.op.src_node
3264         src_image = self.src_image
3265         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3266                                                 src_node, src_image):
3267           raise errors.OpExecError("Could not import os for instance"
3268                                    " %s on node %s" %
3269                                    (instance, pnode_name))
3270       else:
3271         # also checked in the prereq part
3272         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3273                                      % self.op.mode)
3274
3275     if self.op.start:
3276       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3277       feedback_fn("* starting instance...")
3278       if not rpc.call_instance_start(pnode_name, iobj, None):
3279         raise errors.OpExecError("Could not start instance")
3280
3281
3282 class LUConnectConsole(NoHooksLU):
3283   """Connect to an instance's console.
3284
3285   This is somewhat special in that it returns the command line that
3286   you need to run on the master node in order to connect to the
3287   console.
3288
3289   """
3290   _OP_REQP = ["instance_name"]
3291
3292   def CheckPrereq(self):
3293     """Check prerequisites.
3294
3295     This checks that the instance is in the cluster.
3296
3297     """
3298     instance = self.cfg.GetInstanceInfo(
3299       self.cfg.ExpandInstanceName(self.op.instance_name))
3300     if instance is None:
3301       raise errors.OpPrereqError("Instance '%s' not known" %
3302                                  self.op.instance_name)
3303     self.instance = instance
3304
3305   def Exec(self, feedback_fn):
3306     """Connect to the console of an instance
3307
3308     """
3309     instance = self.instance
3310     node = instance.primary_node
3311
3312     node_insts = rpc.call_instance_list([node])[node]
3313     if node_insts is False:
3314       raise errors.OpExecError("Can't connect to node %s." % node)
3315
3316     if instance.name not in node_insts:
3317       raise errors.OpExecError("Instance %s is not running." % instance.name)
3318
3319     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3320
3321     hyper = hypervisor.GetHypervisor()
3322     console_cmd = hyper.GetShellCommandForConsole(instance)
3323     # build ssh cmdline
3324     argv = ["ssh", "-q", "-t"]
3325     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3326     argv.extend(ssh.BATCH_MODE_OPTS)
3327     argv.append(node)
3328     argv.append(console_cmd)
3329     return "ssh", argv
3330
3331
3332 class LUAddMDDRBDComponent(LogicalUnit):
3333   """Adda new mirror member to an instance's disk.
3334
3335   """
3336   HPATH = "mirror-add"
3337   HTYPE = constants.HTYPE_INSTANCE
3338   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3339
3340   def BuildHooksEnv(self):
3341     """Build hooks env.
3342
3343     This runs on the master, the primary and all the secondaries.
3344
3345     """
3346     env = {
3347       "NEW_SECONDARY": self.op.remote_node,
3348       "DISK_NAME": self.op.disk_name,
3349       }
3350     env.update(_BuildInstanceHookEnvByObject(self.instance))
3351     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3352           self.op.remote_node,] + list(self.instance.secondary_nodes)
3353     return env, nl, nl
3354
3355   def CheckPrereq(self):
3356     """Check prerequisites.
3357
3358     This checks that the instance is in the cluster.
3359
3360     """
3361     instance = self.cfg.GetInstanceInfo(
3362       self.cfg.ExpandInstanceName(self.op.instance_name))
3363     if instance is None:
3364       raise errors.OpPrereqError("Instance '%s' not known" %
3365                                  self.op.instance_name)
3366     self.instance = instance
3367
3368     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3369     if remote_node is None:
3370       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3371     self.remote_node = remote_node
3372
3373     if remote_node == instance.primary_node:
3374       raise errors.OpPrereqError("The specified node is the primary node of"
3375                                  " the instance.")
3376
3377     if instance.disk_template != constants.DT_REMOTE_RAID1:
3378       raise errors.OpPrereqError("Instance's disk layout is not"
3379                                  " remote_raid1.")
3380     for disk in instance.disks:
3381       if disk.iv_name == self.op.disk_name:
3382         break
3383     else:
3384       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3385                                  " instance." % self.op.disk_name)
3386     if len(disk.children) > 1:
3387       raise errors.OpPrereqError("The device already has two slave devices."
3388                                  " This would create a 3-disk raid1 which we"
3389                                  " don't allow.")
3390     self.disk = disk
3391
3392   def Exec(self, feedback_fn):
3393     """Add the mirror component
3394
3395     """
3396     disk = self.disk
3397     instance = self.instance
3398
3399     remote_node = self.remote_node
3400     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3401     names = _GenerateUniqueNames(self.cfg, lv_names)
3402     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3403                                      remote_node, disk.size, names)
3404
3405     logger.Info("adding new mirror component on secondary")
3406     #HARDCODE
3407     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3408                                       new_drbd, False,
3409                                       _GetInstanceInfoText(instance)):
3410       raise errors.OpExecError("Failed to create new component on secondary"
3411                                " node %s" % remote_node)
3412
3413     logger.Info("adding new mirror component on primary")
3414     #HARDCODE
3415     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3416                                     instance, new_drbd,
3417                                     _GetInstanceInfoText(instance)):
3418       # remove secondary dev
3419       self.cfg.SetDiskID(new_drbd, remote_node)
3420       rpc.call_blockdev_remove(remote_node, new_drbd)
3421       raise errors.OpExecError("Failed to create volume on primary")
3422
3423     # the device exists now
3424     # call the primary node to add the mirror to md
3425     logger.Info("adding new mirror component to md")
3426     if not rpc.call_blockdev_addchildren(instance.primary_node,
3427                                          disk, [new_drbd]):
3428       logger.Error("Can't add mirror compoment to md!")
3429       self.cfg.SetDiskID(new_drbd, remote_node)
3430       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3431         logger.Error("Can't rollback on secondary")
3432       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3433       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3434         logger.Error("Can't rollback on primary")
3435       raise errors.OpExecError("Can't add mirror component to md array")
3436
3437     disk.children.append(new_drbd)
3438
3439     self.cfg.AddInstance(instance)
3440
3441     _WaitForSync(self.cfg, instance, self.proc)
3442
3443     return 0
3444
3445
3446 class LURemoveMDDRBDComponent(LogicalUnit):
3447   """Remove a component from a remote_raid1 disk.
3448
3449   """
3450   HPATH = "mirror-remove"
3451   HTYPE = constants.HTYPE_INSTANCE
3452   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3453
3454   def BuildHooksEnv(self):
3455     """Build hooks env.
3456
3457     This runs on the master, the primary and all the secondaries.
3458
3459     """
3460     env = {
3461       "DISK_NAME": self.op.disk_name,
3462       "DISK_ID": self.op.disk_id,
3463       "OLD_SECONDARY": self.old_secondary,
3464       }
3465     env.update(_BuildInstanceHookEnvByObject(self.instance))
3466     nl = [self.sstore.GetMasterNode(),
3467           self.instance.primary_node] + list(self.instance.secondary_nodes)
3468     return env, nl, nl
3469
3470   def CheckPrereq(self):
3471     """Check prerequisites.
3472
3473     This checks that the instance is in the cluster.
3474
3475     """
3476     instance = self.cfg.GetInstanceInfo(
3477       self.cfg.ExpandInstanceName(self.op.instance_name))
3478     if instance is None:
3479       raise errors.OpPrereqError("Instance '%s' not known" %
3480                                  self.op.instance_name)
3481     self.instance = instance
3482
3483     if instance.disk_template != constants.DT_REMOTE_RAID1:
3484       raise errors.OpPrereqError("Instance's disk layout is not"
3485                                  " remote_raid1.")
3486     for disk in instance.disks:
3487       if disk.iv_name == self.op.disk_name:
3488         break
3489     else:
3490       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3491                                  " instance." % self.op.disk_name)
3492     for child in disk.children:
3493       if (child.dev_type == constants.LD_DRBD7 and
3494           child.logical_id[2] == self.op.disk_id):
3495         break
3496     else:
3497       raise errors.OpPrereqError("Can't find the device with this port.")
3498
3499     if len(disk.children) < 2:
3500       raise errors.OpPrereqError("Cannot remove the last component from"
3501                                  " a mirror.")
3502     self.disk = disk
3503     self.child = child
3504     if self.child.logical_id[0] == instance.primary_node:
3505       oid = 1
3506     else:
3507       oid = 0
3508     self.old_secondary = self.child.logical_id[oid]
3509
3510   def Exec(self, feedback_fn):
3511     """Remove the mirror component
3512
3513     """
3514     instance = self.instance
3515     disk = self.disk
3516     child = self.child
3517     logger.Info("remove mirror component")
3518     self.cfg.SetDiskID(disk, instance.primary_node)
3519     if not rpc.call_blockdev_removechildren(instance.primary_node,
3520                                             disk, [child]):
3521       raise errors.OpExecError("Can't remove child from mirror.")
3522
3523     for node in child.logical_id[:2]:
3524       self.cfg.SetDiskID(child, node)
3525       if not rpc.call_blockdev_remove(node, child):
3526         logger.Error("Warning: failed to remove device from node %s,"
3527                      " continuing operation." % node)
3528
3529     disk.children.remove(child)
3530     self.cfg.AddInstance(instance)
3531
3532
3533 class LUReplaceDisks(LogicalUnit):
3534   """Replace the disks of an instance.
3535
3536   """
3537   HPATH = "mirrors-replace"
3538   HTYPE = constants.HTYPE_INSTANCE
3539   _OP_REQP = ["instance_name", "mode", "disks"]
3540
3541   def BuildHooksEnv(self):
3542     """Build hooks env.
3543
3544     This runs on the master, the primary and all the secondaries.
3545
3546     """
3547     env = {
3548       "MODE": self.op.mode,
3549       "NEW_SECONDARY": self.op.remote_node,
3550       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3551       }
3552     env.update(_BuildInstanceHookEnvByObject(self.instance))
3553     nl = [
3554       self.sstore.GetMasterNode(),
3555       self.instance.primary_node,
3556       ]
3557     if self.op.remote_node is not None:
3558       nl.append(self.op.remote_node)
3559     return env, nl, nl
3560
3561   def CheckPrereq(self):
3562     """Check prerequisites.
3563
3564     This checks that the instance is in the cluster.
3565
3566     """
3567     instance = self.cfg.GetInstanceInfo(
3568       self.cfg.ExpandInstanceName(self.op.instance_name))
3569     if instance is None:
3570       raise errors.OpPrereqError("Instance '%s' not known" %
3571                                  self.op.instance_name)
3572     self.instance = instance
3573     self.op.instance_name = instance.name
3574
3575     if instance.disk_template not in constants.DTS_NET_MIRROR:
3576       raise errors.OpPrereqError("Instance's disk layout is not"
3577                                  " network mirrored.")
3578
3579     if len(instance.secondary_nodes) != 1:
3580       raise errors.OpPrereqError("The instance has a strange layout,"
3581                                  " expected one secondary but found %d" %
3582                                  len(instance.secondary_nodes))
3583
3584     self.sec_node = instance.secondary_nodes[0]
3585
3586     remote_node = getattr(self.op, "remote_node", None)
3587     if remote_node is not None:
3588       remote_node = self.cfg.ExpandNodeName(remote_node)
3589       if remote_node is None:
3590         raise errors.OpPrereqError("Node '%s' not known" %
3591                                    self.op.remote_node)
3592       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3593     else:
3594       self.remote_node_info = None
3595     if remote_node == instance.primary_node:
3596       raise errors.OpPrereqError("The specified node is the primary node of"
3597                                  " the instance.")
3598     elif remote_node == self.sec_node:
3599       if self.op.mode == constants.REPLACE_DISK_SEC:
3600         # this is for DRBD8, where we can't execute the same mode of
3601         # replacement as for drbd7 (no different port allocated)
3602         raise errors.OpPrereqError("Same secondary given, cannot execute"
3603                                    " replacement")
3604       # the user gave the current secondary, switch to
3605       # 'no-replace-secondary' mode for drbd7
3606       remote_node = None
3607     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3608         self.op.mode != constants.REPLACE_DISK_ALL):
3609       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3610                                  " disks replacement, not individual ones")
3611     if instance.disk_template == constants.DT_DRBD8:
3612       if (self.op.mode == constants.REPLACE_DISK_ALL and
3613           remote_node is not None):
3614         # switch to replace secondary mode
3615         self.op.mode = constants.REPLACE_DISK_SEC
3616
3617       if self.op.mode == constants.REPLACE_DISK_ALL:
3618         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3619                                    " secondary disk replacement, not"
3620                                    " both at once")
3621       elif self.op.mode == constants.REPLACE_DISK_PRI:
3622         if remote_node is not None:
3623           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3624                                      " the secondary while doing a primary"
3625                                      " node disk replacement")
3626         self.tgt_node = instance.primary_node
3627         self.oth_node = instance.secondary_nodes[0]
3628       elif self.op.mode == constants.REPLACE_DISK_SEC:
3629         self.new_node = remote_node # this can be None, in which case
3630                                     # we don't change the secondary
3631         self.tgt_node = instance.secondary_nodes[0]
3632         self.oth_node = instance.primary_node
3633       else:
3634         raise errors.ProgrammerError("Unhandled disk replace mode")
3635
3636     for name in self.op.disks:
3637       if instance.FindDisk(name) is None:
3638         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3639                                    (name, instance.name))
3640     self.op.remote_node = remote_node
3641
3642   def _ExecRR1(self, feedback_fn):
3643     """Replace the disks of an instance.
3644
3645     """
3646     instance = self.instance
3647     iv_names = {}
3648     # start of work
3649     if self.op.remote_node is None:
3650       remote_node = self.sec_node
3651     else:
3652       remote_node = self.op.remote_node
3653     cfg = self.cfg
3654     for dev in instance.disks:
3655       size = dev.size
3656       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3657       names = _GenerateUniqueNames(cfg, lv_names)
3658       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3659                                        remote_node, size, names)
3660       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3661       logger.Info("adding new mirror component on secondary for %s" %
3662                   dev.iv_name)
3663       #HARDCODE
3664       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3665                                         new_drbd, False,
3666                                         _GetInstanceInfoText(instance)):
3667         raise errors.OpExecError("Failed to create new component on secondary"
3668                                  " node %s. Full abort, cleanup manually!" %
3669                                  remote_node)
3670
3671       logger.Info("adding new mirror component on primary")
3672       #HARDCODE
3673       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3674                                       instance, new_drbd,
3675                                       _GetInstanceInfoText(instance)):
3676         # remove secondary dev
3677         cfg.SetDiskID(new_drbd, remote_node)
3678         rpc.call_blockdev_remove(remote_node, new_drbd)
3679         raise errors.OpExecError("Failed to create volume on primary!"
3680                                  " Full abort, cleanup manually!!")
3681
3682       # the device exists now
3683       # call the primary node to add the mirror to md
3684       logger.Info("adding new mirror component to md")
3685       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3686                                            [new_drbd]):
3687         logger.Error("Can't add mirror compoment to md!")
3688         cfg.SetDiskID(new_drbd, remote_node)
3689         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3690           logger.Error("Can't rollback on secondary")
3691         cfg.SetDiskID(new_drbd, instance.primary_node)
3692         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3693           logger.Error("Can't rollback on primary")
3694         raise errors.OpExecError("Full abort, cleanup manually!!")
3695
3696       dev.children.append(new_drbd)
3697       cfg.AddInstance(instance)
3698
3699     # this can fail as the old devices are degraded and _WaitForSync
3700     # does a combined result over all disks, so we don't check its
3701     # return value
3702     _WaitForSync(cfg, instance, self.proc, unlock=True)
3703
3704     # so check manually all the devices
3705     for name in iv_names:
3706       dev, child, new_drbd = iv_names[name]
3707       cfg.SetDiskID(dev, instance.primary_node)
3708       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3709       if is_degr:
3710         raise errors.OpExecError("MD device %s is degraded!" % name)
3711       cfg.SetDiskID(new_drbd, instance.primary_node)
3712       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3713       if is_degr:
3714         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3715
3716     for name in iv_names:
3717       dev, child, new_drbd = iv_names[name]
3718       logger.Info("remove mirror %s component" % name)
3719       cfg.SetDiskID(dev, instance.primary_node)
3720       if not rpc.call_blockdev_removechildren(instance.primary_node,
3721                                               dev, [child]):
3722         logger.Error("Can't remove child from mirror, aborting"
3723                      " *this device cleanup*.\nYou need to cleanup manually!!")
3724         continue
3725
3726       for node in child.logical_id[:2]:
3727         logger.Info("remove child device on %s" % node)
3728         cfg.SetDiskID(child, node)
3729         if not rpc.call_blockdev_remove(node, child):
3730           logger.Error("Warning: failed to remove device from node %s,"
3731                        " continuing operation." % node)
3732
3733       dev.children.remove(child)
3734
3735       cfg.AddInstance(instance)
3736
3737   def _ExecD8DiskOnly(self, feedback_fn):
3738     """Replace a disk on the primary or secondary for dbrd8.
3739
3740     The algorithm for replace is quite complicated:
3741       - for each disk to be replaced:
3742         - create new LVs on the target node with unique names
3743         - detach old LVs from the drbd device
3744         - rename old LVs to name_replaced.<time_t>
3745         - rename new LVs to old LVs
3746         - attach the new LVs (with the old names now) to the drbd device
3747       - wait for sync across all devices
3748       - for each modified disk:
3749         - remove old LVs (which have the name name_replaces.<time_t>)
3750
3751     Failures are not very well handled.
3752
3753     """
3754     steps_total = 6
3755     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3756     instance = self.instance
3757     iv_names = {}
3758     vgname = self.cfg.GetVGName()
3759     # start of work
3760     cfg = self.cfg
3761     tgt_node = self.tgt_node
3762     oth_node = self.oth_node
3763
3764     # Step: check device activation
3765     self.proc.LogStep(1, steps_total, "check device existence")
3766     info("checking volume groups")
3767     my_vg = cfg.GetVGName()
3768     results = rpc.call_vg_list([oth_node, tgt_node])
3769     if not results:
3770       raise errors.OpExecError("Can't list volume groups on the nodes")
3771     for node in oth_node, tgt_node:
3772       res = results.get(node, False)
3773       if not res or my_vg not in res:
3774         raise errors.OpExecError("Volume group '%s' not found on %s" %
3775                                  (my_vg, node))
3776     for dev in instance.disks:
3777       if not dev.iv_name in self.op.disks:
3778         continue
3779       for node in tgt_node, oth_node:
3780         info("checking %s on %s" % (dev.iv_name, node))
3781         cfg.SetDiskID(dev, node)
3782         if not rpc.call_blockdev_find(node, dev):
3783           raise errors.OpExecError("Can't find device %s on node %s" %
3784                                    (dev.iv_name, node))
3785
3786     # Step: check other node consistency
3787     self.proc.LogStep(2, steps_total, "check peer consistency")
3788     for dev in instance.disks:
3789       if not dev.iv_name in self.op.disks:
3790         continue
3791       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3792       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3793                                    oth_node==instance.primary_node):
3794         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3795                                  " to replace disks on this node (%s)" %
3796                                  (oth_node, tgt_node))
3797
3798     # Step: create new storage
3799     self.proc.LogStep(3, steps_total, "allocate new storage")
3800     for dev in instance.disks:
3801       if not dev.iv_name in self.op.disks:
3802         continue
3803       size = dev.size
3804       cfg.SetDiskID(dev, tgt_node)
3805       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3806       names = _GenerateUniqueNames(cfg, lv_names)
3807       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3808                              logical_id=(vgname, names[0]))
3809       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3810                              logical_id=(vgname, names[1]))
3811       new_lvs = [lv_data, lv_meta]
3812       old_lvs = dev.children
3813       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3814       info("creating new local storage on %s for %s" %
3815            (tgt_node, dev.iv_name))
3816       # since we *always* want to create this LV, we use the
3817       # _Create...OnPrimary (which forces the creation), even if we
3818       # are talking about the secondary node
3819       for new_lv in new_lvs:
3820         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3821                                         _GetInstanceInfoText(instance)):
3822           raise errors.OpExecError("Failed to create new LV named '%s' on"
3823                                    " node '%s'" %
3824                                    (new_lv.logical_id[1], tgt_node))
3825
3826     # Step: for each lv, detach+rename*2+attach
3827     self.proc.LogStep(4, steps_total, "change drbd configuration")
3828     for dev, old_lvs, new_lvs in iv_names.itervalues():
3829       info("detaching %s drbd from local storage" % dev.iv_name)
3830       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3831         raise errors.OpExecError("Can't detach drbd from local storage on node"
3832                                  " %s for device %s" % (tgt_node, dev.iv_name))
3833       #dev.children = []
3834       #cfg.Update(instance)
3835
3836       # ok, we created the new LVs, so now we know we have the needed
3837       # storage; as such, we proceed on the target node to rename
3838       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3839       # using the assumption that logical_id == physical_id (which in
3840       # turn is the unique_id on that node)
3841
3842       # FIXME(iustin): use a better name for the replaced LVs
3843       temp_suffix = int(time.time())
3844       ren_fn = lambda d, suff: (d.physical_id[0],
3845                                 d.physical_id[1] + "_replaced-%s" % suff)
3846       # build the rename list based on what LVs exist on the node
3847       rlist = []
3848       for to_ren in old_lvs:
3849         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3850         if find_res is not None: # device exists
3851           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3852
3853       info("renaming the old LVs on the target node")
3854       if not rpc.call_blockdev_rename(tgt_node, rlist):
3855         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3856       # now we rename the new LVs to the old LVs
3857       info("renaming the new LVs on the target node")
3858       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3859       if not rpc.call_blockdev_rename(tgt_node, rlist):
3860         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3861
3862       for old, new in zip(old_lvs, new_lvs):
3863         new.logical_id = old.logical_id
3864         cfg.SetDiskID(new, tgt_node)
3865
3866       for disk in old_lvs:
3867         disk.logical_id = ren_fn(disk, temp_suffix)
3868         cfg.SetDiskID(disk, tgt_node)
3869
3870       # now that the new lvs have the old name, we can add them to the device
3871       info("adding new mirror component on %s" % tgt_node)
3872       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3873         for new_lv in new_lvs:
3874           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3875             warning("Can't rollback device %s", hint="manually cleanup unused"
3876                     " logical volumes")
3877         raise errors.OpExecError("Can't add local storage to drbd")
3878
3879       dev.children = new_lvs
3880       cfg.Update(instance)
3881
3882     # Step: wait for sync
3883
3884     # this can fail as the old devices are degraded and _WaitForSync
3885     # does a combined result over all disks, so we don't check its
3886     # return value
3887     self.proc.LogStep(5, steps_total, "sync devices")
3888     _WaitForSync(cfg, instance, self.proc, unlock=True)
3889
3890     # so check manually all the devices
3891     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3892       cfg.SetDiskID(dev, instance.primary_node)
3893       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3894       if is_degr:
3895         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3896
3897     # Step: remove old storage
3898     self.proc.LogStep(6, steps_total, "removing old storage")
3899     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3900       info("remove logical volumes for %s" % name)
3901       for lv in old_lvs:
3902         cfg.SetDiskID(lv, tgt_node)
3903         if not rpc.call_blockdev_remove(tgt_node, lv):
3904           warning("Can't remove old LV", hint="manually remove unused LVs")
3905           continue
3906
3907   def _ExecD8Secondary(self, feedback_fn):
3908     """Replace the secondary node for drbd8.
3909
3910     The algorithm for replace is quite complicated:
3911       - for all disks of the instance:
3912         - create new LVs on the new node with same names
3913         - shutdown the drbd device on the old secondary
3914         - disconnect the drbd network on the primary
3915         - create the drbd device on the new secondary
3916         - network attach the drbd on the primary, using an artifice:
3917           the drbd code for Attach() will connect to the network if it
3918           finds a device which is connected to the good local disks but
3919           not network enabled
3920       - wait for sync across all devices
3921       - remove all disks from the old secondary
3922
3923     Failures are not very well handled.
3924
3925     """
3926     steps_total = 6
3927     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3928     instance = self.instance
3929     iv_names = {}
3930     vgname = self.cfg.GetVGName()
3931     # start of work
3932     cfg = self.cfg
3933     old_node = self.tgt_node
3934     new_node = self.new_node
3935     pri_node = instance.primary_node
3936
3937     # Step: check device activation
3938     self.proc.LogStep(1, steps_total, "check device existence")
3939     info("checking volume groups")
3940     my_vg = cfg.GetVGName()
3941     results = rpc.call_vg_list([pri_node, new_node])
3942     if not results:
3943       raise errors.OpExecError("Can't list volume groups on the nodes")
3944     for node in pri_node, new_node:
3945       res = results.get(node, False)
3946       if not res or my_vg not in res:
3947         raise errors.OpExecError("Volume group '%s' not found on %s" %
3948                                  (my_vg, node))
3949     for dev in instance.disks:
3950       if not dev.iv_name in self.op.disks:
3951         continue
3952       info("checking %s on %s" % (dev.iv_name, pri_node))
3953       cfg.SetDiskID(dev, pri_node)
3954       if not rpc.call_blockdev_find(pri_node, dev):
3955         raise errors.OpExecError("Can't find device %s on node %s" %
3956                                  (dev.iv_name, pri_node))
3957
3958     # Step: check other node consistency
3959     self.proc.LogStep(2, steps_total, "check peer consistency")
3960     for dev in instance.disks:
3961       if not dev.iv_name in self.op.disks:
3962         continue
3963       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3964       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3965         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3966                                  " unsafe to replace the secondary" %
3967                                  pri_node)
3968
3969     # Step: create new storage
3970     self.proc.LogStep(3, steps_total, "allocate new storage")
3971     for dev in instance.disks:
3972       size = dev.size
3973       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3974       # since we *always* want to create this LV, we use the
3975       # _Create...OnPrimary (which forces the creation), even if we
3976       # are talking about the secondary node
3977       for new_lv in dev.children:
3978         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3979                                         _GetInstanceInfoText(instance)):
3980           raise errors.OpExecError("Failed to create new LV named '%s' on"
3981                                    " node '%s'" %
3982                                    (new_lv.logical_id[1], new_node))
3983
3984       iv_names[dev.iv_name] = (dev, dev.children)
3985
3986     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3987     for dev in instance.disks:
3988       size = dev.size
3989       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3990       # create new devices on new_node
3991       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3992                               logical_id=(pri_node, new_node,
3993                                           dev.logical_id[2]),
3994                               children=dev.children)
3995       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3996                                         new_drbd, False,
3997                                       _GetInstanceInfoText(instance)):
3998         raise errors.OpExecError("Failed to create new DRBD on"
3999                                  " node '%s'" % new_node)
4000
4001     for dev in instance.disks:
4002       # we have new devices, shutdown the drbd on the old secondary
4003       info("shutting down drbd for %s on old node" % dev.iv_name)
4004       cfg.SetDiskID(dev, old_node)
4005       if not rpc.call_blockdev_shutdown(old_node, dev):
4006         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4007                 hint="Please cleanup this device manually as soon as possible")
4008
4009     info("detaching primary drbds from the network (=> standalone)")
4010     done = 0
4011     for dev in instance.disks:
4012       cfg.SetDiskID(dev, pri_node)
4013       # set the physical (unique in bdev terms) id to None, meaning
4014       # detach from network
4015       dev.physical_id = (None,) * len(dev.physical_id)
4016       # and 'find' the device, which will 'fix' it to match the
4017       # standalone state
4018       if rpc.call_blockdev_find(pri_node, dev):
4019         done += 1
4020       else:
4021         warning("Failed to detach drbd %s from network, unusual case" %
4022                 dev.iv_name)
4023
4024     if not done:
4025       # no detaches succeeded (very unlikely)
4026       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4027
4028     # if we managed to detach at least one, we update all the disks of
4029     # the instance to point to the new secondary
4030     info("updating instance configuration")
4031     for dev in instance.disks:
4032       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4033       cfg.SetDiskID(dev, pri_node)
4034     cfg.Update(instance)
4035
4036     # and now perform the drbd attach
4037     info("attaching primary drbds to new secondary (standalone => connected)")
4038     failures = []
4039     for dev in instance.disks:
4040       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4041       # since the attach is smart, it's enough to 'find' the device,
4042       # it will automatically activate the network, if the physical_id
4043       # is correct
4044       cfg.SetDiskID(dev, pri_node)
4045       if not rpc.call_blockdev_find(pri_node, dev):
4046         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4047                 "please do a gnt-instance info to see the status of disks")
4048
4049     # this can fail as the old devices are degraded and _WaitForSync
4050     # does a combined result over all disks, so we don't check its
4051     # return value
4052     self.proc.LogStep(5, steps_total, "sync devices")
4053     _WaitForSync(cfg, instance, self.proc, unlock=True)
4054
4055     # so check manually all the devices
4056     for name, (dev, old_lvs) in iv_names.iteritems():
4057       cfg.SetDiskID(dev, pri_node)
4058       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4059       if is_degr:
4060         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4061
4062     self.proc.LogStep(6, steps_total, "removing old storage")
4063     for name, (dev, old_lvs) in iv_names.iteritems():
4064       info("remove logical volumes for %s" % name)
4065       for lv in old_lvs:
4066         cfg.SetDiskID(lv, old_node)
4067         if not rpc.call_blockdev_remove(old_node, lv):
4068           warning("Can't remove LV on old secondary",
4069                   hint="Cleanup stale volumes by hand")
4070
4071   def Exec(self, feedback_fn):
4072     """Execute disk replacement.
4073
4074     This dispatches the disk replacement to the appropriate handler.
4075
4076     """
4077     instance = self.instance
4078     if instance.disk_template == constants.DT_REMOTE_RAID1:
4079       fn = self._ExecRR1
4080     elif instance.disk_template == constants.DT_DRBD8:
4081       if self.op.remote_node is None:
4082         fn = self._ExecD8DiskOnly
4083       else:
4084         fn = self._ExecD8Secondary
4085     else:
4086       raise errors.ProgrammerError("Unhandled disk replacement case")
4087     return fn(feedback_fn)
4088
4089
4090 class LUQueryInstanceData(NoHooksLU):
4091   """Query runtime instance data.
4092
4093   """
4094   _OP_REQP = ["instances"]
4095
4096   def CheckPrereq(self):
4097     """Check prerequisites.
4098
4099     This only checks the optional instance list against the existing names.
4100
4101     """
4102     if not isinstance(self.op.instances, list):
4103       raise errors.OpPrereqError("Invalid argument type 'instances'")
4104     if self.op.instances:
4105       self.wanted_instances = []
4106       names = self.op.instances
4107       for name in names:
4108         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4109         if instance is None:
4110           raise errors.OpPrereqError("No such instance name '%s'" % name)
4111         self.wanted_instances.append(instance)
4112     else:
4113       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4114                                in self.cfg.GetInstanceList()]
4115     return
4116
4117
4118   def _ComputeDiskStatus(self, instance, snode, dev):
4119     """Compute block device status.
4120
4121     """
4122     self.cfg.SetDiskID(dev, instance.primary_node)
4123     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4124     if dev.dev_type in constants.LDS_DRBD:
4125       # we change the snode then (otherwise we use the one passed in)
4126       if dev.logical_id[0] == instance.primary_node:
4127         snode = dev.logical_id[1]
4128       else:
4129         snode = dev.logical_id[0]
4130
4131     if snode:
4132       self.cfg.SetDiskID(dev, snode)
4133       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4134     else:
4135       dev_sstatus = None
4136
4137     if dev.children:
4138       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4139                       for child in dev.children]
4140     else:
4141       dev_children = []
4142
4143     data = {
4144       "iv_name": dev.iv_name,
4145       "dev_type": dev.dev_type,
4146       "logical_id": dev.logical_id,
4147       "physical_id": dev.physical_id,
4148       "pstatus": dev_pstatus,
4149       "sstatus": dev_sstatus,
4150       "children": dev_children,
4151       }
4152
4153     return data
4154
4155   def Exec(self, feedback_fn):
4156     """Gather and return data"""
4157     result = {}
4158     for instance in self.wanted_instances:
4159       remote_info = rpc.call_instance_info(instance.primary_node,
4160                                                 instance.name)
4161       if remote_info and "state" in remote_info:
4162         remote_state = "up"
4163       else:
4164         remote_state = "down"
4165       if instance.status == "down":
4166         config_state = "down"
4167       else:
4168         config_state = "up"
4169
4170       disks = [self._ComputeDiskStatus(instance, None, device)
4171                for device in instance.disks]
4172
4173       idict = {
4174         "name": instance.name,
4175         "config_state": config_state,
4176         "run_state": remote_state,
4177         "pnode": instance.primary_node,
4178         "snodes": instance.secondary_nodes,
4179         "os": instance.os,
4180         "memory": instance.memory,
4181         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4182         "disks": disks,
4183         "network_port": instance.network_port,
4184         "vcpus": instance.vcpus,
4185         "kernel_path": instance.kernel_path,
4186         "initrd_path": instance.initrd_path,
4187         "hvm_boot_order": instance.hvm_boot_order,
4188         }
4189
4190       result[instance.name] = idict
4191
4192     return result
4193
4194
4195 class LUSetInstanceParms(LogicalUnit):
4196   """Modifies an instances's parameters.
4197
4198   """
4199   HPATH = "instance-modify"
4200   HTYPE = constants.HTYPE_INSTANCE
4201   _OP_REQP = ["instance_name"]
4202
4203   def BuildHooksEnv(self):
4204     """Build hooks env.
4205
4206     This runs on the master, primary and secondaries.
4207
4208     """
4209     args = dict()
4210     if self.mem:
4211       args['memory'] = self.mem
4212     if self.vcpus:
4213       args['vcpus'] = self.vcpus
4214     if self.do_ip or self.do_bridge or self.mac:
4215       if self.do_ip:
4216         ip = self.ip
4217       else:
4218         ip = self.instance.nics[0].ip
4219       if self.bridge:
4220         bridge = self.bridge
4221       else:
4222         bridge = self.instance.nics[0].bridge
4223       if self.mac:
4224         mac = self.mac
4225       else:
4226         mac = self.instance.nics[0].mac
4227       args['nics'] = [(ip, bridge, mac)]
4228     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4229     nl = [self.sstore.GetMasterNode(),
4230           self.instance.primary_node] + list(self.instance.secondary_nodes)
4231     return env, nl, nl
4232
4233   def CheckPrereq(self):
4234     """Check prerequisites.
4235
4236     This only checks the instance list against the existing names.
4237
4238     """
4239     self.mem = getattr(self.op, "mem", None)
4240     self.vcpus = getattr(self.op, "vcpus", None)
4241     self.ip = getattr(self.op, "ip", None)
4242     self.mac = getattr(self.op, "mac", None)
4243     self.bridge = getattr(self.op, "bridge", None)
4244     self.kernel_path = getattr(self.op, "kernel_path", None)
4245     self.initrd_path = getattr(self.op, "initrd_path", None)
4246     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4247     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4248                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4249     if all_parms.count(None) == len(all_parms):
4250       raise errors.OpPrereqError("No changes submitted")
4251     if self.mem is not None:
4252       try:
4253         self.mem = int(self.mem)
4254       except ValueError, err:
4255         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4256     if self.vcpus is not None:
4257       try:
4258         self.vcpus = int(self.vcpus)
4259       except ValueError, err:
4260         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4261     if self.ip is not None:
4262       self.do_ip = True
4263       if self.ip.lower() == "none":
4264         self.ip = None
4265       else:
4266         if not utils.IsValidIP(self.ip):
4267           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4268     else:
4269       self.do_ip = False
4270     self.do_bridge = (self.bridge is not None)
4271     if self.mac is not None:
4272       if self.cfg.IsMacInUse(self.mac):
4273         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4274                                    self.mac)
4275       if not utils.IsValidMac(self.mac):
4276         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4277
4278     if self.kernel_path is not None:
4279       self.do_kernel_path = True
4280       if self.kernel_path == constants.VALUE_NONE:
4281         raise errors.OpPrereqError("Can't set instance to no kernel")
4282
4283       if self.kernel_path != constants.VALUE_DEFAULT:
4284         if not os.path.isabs(self.kernel_path):
4285           raise errors.OpPrereqError("The kernel path must be an absolute"
4286                                     " filename")
4287     else:
4288       self.do_kernel_path = False
4289
4290     if self.initrd_path is not None:
4291       self.do_initrd_path = True
4292       if self.initrd_path not in (constants.VALUE_NONE,
4293                                   constants.VALUE_DEFAULT):
4294         if not os.path.isabs(self.initrd_path):
4295           raise errors.OpPrereqError("The initrd path must be an absolute"
4296                                     " filename")
4297     else:
4298       self.do_initrd_path = False
4299
4300     # boot order verification
4301     if self.hvm_boot_order is not None:
4302       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4303         if len(self.hvm_boot_order.strip("acdn")) != 0:
4304           raise errors.OpPrereqError("invalid boot order specified,"
4305                                      " must be one or more of [acdn]"
4306                                      " or 'default'")
4307
4308     instance = self.cfg.GetInstanceInfo(
4309       self.cfg.ExpandInstanceName(self.op.instance_name))
4310     if instance is None:
4311       raise errors.OpPrereqError("No such instance name '%s'" %
4312                                  self.op.instance_name)
4313     self.op.instance_name = instance.name
4314     self.instance = instance
4315     return
4316
4317   def Exec(self, feedback_fn):
4318     """Modifies an instance.
4319
4320     All parameters take effect only at the next restart of the instance.
4321     """
4322     result = []
4323     instance = self.instance
4324     if self.mem:
4325       instance.memory = self.mem
4326       result.append(("mem", self.mem))
4327     if self.vcpus:
4328       instance.vcpus = self.vcpus
4329       result.append(("vcpus",  self.vcpus))
4330     if self.do_ip:
4331       instance.nics[0].ip = self.ip
4332       result.append(("ip", self.ip))
4333     if self.bridge:
4334       instance.nics[0].bridge = self.bridge
4335       result.append(("bridge", self.bridge))
4336     if self.mac:
4337       instance.nics[0].mac = self.mac
4338       result.append(("mac", self.mac))
4339     if self.do_kernel_path:
4340       instance.kernel_path = self.kernel_path
4341       result.append(("kernel_path", self.kernel_path))
4342     if self.do_initrd_path:
4343       instance.initrd_path = self.initrd_path
4344       result.append(("initrd_path", self.initrd_path))
4345     if self.hvm_boot_order:
4346       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4347         instance.hvm_boot_order = None
4348       else:
4349         instance.hvm_boot_order = self.hvm_boot_order
4350       result.append(("hvm_boot_order", self.hvm_boot_order))
4351
4352     self.cfg.AddInstance(instance)
4353
4354     return result
4355
4356
4357 class LUQueryExports(NoHooksLU):
4358   """Query the exports list
4359
4360   """
4361   _OP_REQP = []
4362
4363   def CheckPrereq(self):
4364     """Check that the nodelist contains only existing nodes.
4365
4366     """
4367     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4368
4369   def Exec(self, feedback_fn):
4370     """Compute the list of all the exported system images.
4371
4372     Returns:
4373       a dictionary with the structure node->(export-list)
4374       where export-list is a list of the instances exported on
4375       that node.
4376
4377     """
4378     return rpc.call_export_list(self.nodes)
4379
4380
4381 class LUExportInstance(LogicalUnit):
4382   """Export an instance to an image in the cluster.
4383
4384   """
4385   HPATH = "instance-export"
4386   HTYPE = constants.HTYPE_INSTANCE
4387   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4388
4389   def BuildHooksEnv(self):
4390     """Build hooks env.
4391
4392     This will run on the master, primary node and target node.
4393
4394     """
4395     env = {
4396       "EXPORT_NODE": self.op.target_node,
4397       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4398       }
4399     env.update(_BuildInstanceHookEnvByObject(self.instance))
4400     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4401           self.op.target_node]
4402     return env, nl, nl
4403
4404   def CheckPrereq(self):
4405     """Check prerequisites.
4406
4407     This checks that the instance name is a valid one.
4408
4409     """
4410     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4411     self.instance = self.cfg.GetInstanceInfo(instance_name)
4412     if self.instance is None:
4413       raise errors.OpPrereqError("Instance '%s' not found" %
4414                                  self.op.instance_name)
4415
4416     # node verification
4417     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4418     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4419
4420     if self.dst_node is None:
4421       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4422                                  self.op.target_node)
4423     self.op.target_node = self.dst_node.name
4424
4425   def Exec(self, feedback_fn):
4426     """Export an instance to an image in the cluster.
4427
4428     """
4429     instance = self.instance
4430     dst_node = self.dst_node
4431     src_node = instance.primary_node
4432     if self.op.shutdown:
4433       # shutdown the instance, but not the disks
4434       if not rpc.call_instance_shutdown(src_node, instance):
4435          raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4436                                  (instance.name, source_node))
4437
4438     vgname = self.cfg.GetVGName()
4439
4440     snap_disks = []
4441
4442     try:
4443       for disk in instance.disks:
4444         if disk.iv_name == "sda":
4445           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4446           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4447
4448           if not new_dev_name:
4449             logger.Error("could not snapshot block device %s on node %s" %
4450                          (disk.logical_id[1], src_node))
4451           else:
4452             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4453                                       logical_id=(vgname, new_dev_name),
4454                                       physical_id=(vgname, new_dev_name),
4455                                       iv_name=disk.iv_name)
4456             snap_disks.append(new_dev)
4457
4458     finally:
4459       if self.op.shutdown and instance.status == "up":
4460         if not rpc.call_instance_start(src_node, instance, None):
4461           _ShutdownInstanceDisks(instance, self.cfg)
4462           raise errors.OpExecError("Could not start instance")
4463
4464     # TODO: check for size
4465
4466     for dev in snap_disks:
4467       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4468                                            instance):
4469         logger.Error("could not export block device %s from node"
4470                      " %s to node %s" %
4471                      (dev.logical_id[1], src_node, dst_node.name))
4472       if not rpc.call_blockdev_remove(src_node, dev):
4473         logger.Error("could not remove snapshot block device %s from"
4474                      " node %s" % (dev.logical_id[1], src_node))
4475
4476     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4477       logger.Error("could not finalize export for instance %s on node %s" %
4478                    (instance.name, dst_node.name))
4479
4480     nodelist = self.cfg.GetNodeList()
4481     nodelist.remove(dst_node.name)
4482
4483     # on one-node clusters nodelist will be empty after the removal
4484     # if we proceed the backup would be removed because OpQueryExports
4485     # substitutes an empty list with the full cluster node list.
4486     if nodelist:
4487       op = opcodes.OpQueryExports(nodes=nodelist)
4488       exportlist = self.proc.ChainOpCode(op)
4489       for node in exportlist:
4490         if instance.name in exportlist[node]:
4491           if not rpc.call_export_remove(node, instance.name):
4492             logger.Error("could not remove older export for instance %s"
4493                          " on node %s" % (instance.name, node))
4494
4495
4496 class TagsLU(NoHooksLU):
4497   """Generic tags LU.
4498
4499   This is an abstract class which is the parent of all the other tags LUs.
4500
4501   """
4502   def CheckPrereq(self):
4503     """Check prerequisites.
4504
4505     """
4506     if self.op.kind == constants.TAG_CLUSTER:
4507       self.target = self.cfg.GetClusterInfo()
4508     elif self.op.kind == constants.TAG_NODE:
4509       name = self.cfg.ExpandNodeName(self.op.name)
4510       if name is None:
4511         raise errors.OpPrereqError("Invalid node name (%s)" %
4512                                    (self.op.name,))
4513       self.op.name = name
4514       self.target = self.cfg.GetNodeInfo(name)
4515     elif self.op.kind == constants.TAG_INSTANCE:
4516       name = self.cfg.ExpandInstanceName(self.op.name)
4517       if name is None:
4518         raise errors.OpPrereqError("Invalid instance name (%s)" %
4519                                    (self.op.name,))
4520       self.op.name = name
4521       self.target = self.cfg.GetInstanceInfo(name)
4522     else:
4523       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4524                                  str(self.op.kind))
4525
4526
4527 class LUGetTags(TagsLU):
4528   """Returns the tags of a given object.
4529
4530   """
4531   _OP_REQP = ["kind", "name"]
4532
4533   def Exec(self, feedback_fn):
4534     """Returns the tag list.
4535
4536     """
4537     return self.target.GetTags()
4538
4539
4540 class LUSearchTags(NoHooksLU):
4541   """Searches the tags for a given pattern.
4542
4543   """
4544   _OP_REQP = ["pattern"]
4545
4546   def CheckPrereq(self):
4547     """Check prerequisites.
4548
4549     This checks the pattern passed for validity by compiling it.
4550
4551     """
4552     try:
4553       self.re = re.compile(self.op.pattern)
4554     except re.error, err:
4555       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4556                                  (self.op.pattern, err))
4557
4558   def Exec(self, feedback_fn):
4559     """Returns the tag list.
4560
4561     """
4562     cfg = self.cfg
4563     tgts = [("/cluster", cfg.GetClusterInfo())]
4564     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4565     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4566     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4567     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4568     results = []
4569     for path, target in tgts:
4570       for tag in target.GetTags():
4571         if self.re.search(tag):
4572           results.append((path, tag))
4573     return results
4574
4575
4576 class LUAddTags(TagsLU):
4577   """Sets a tag on a given object.
4578
4579   """
4580   _OP_REQP = ["kind", "name", "tags"]
4581
4582   def CheckPrereq(self):
4583     """Check prerequisites.
4584
4585     This checks the type and length of the tag name and value.
4586
4587     """
4588     TagsLU.CheckPrereq(self)
4589     for tag in self.op.tags:
4590       objects.TaggableObject.ValidateTag(tag)
4591
4592   def Exec(self, feedback_fn):
4593     """Sets the tag.
4594
4595     """
4596     try:
4597       for tag in self.op.tags:
4598         self.target.AddTag(tag)
4599     except errors.TagError, err:
4600       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4601     try:
4602       self.cfg.Update(self.target)
4603     except errors.ConfigurationError:
4604       raise errors.OpRetryError("There has been a modification to the"
4605                                 " config file and the operation has been"
4606                                 " aborted. Please retry.")
4607
4608
4609 class LUDelTags(TagsLU):
4610   """Delete a list of tags from a given object.
4611
4612   """
4613   _OP_REQP = ["kind", "name", "tags"]
4614
4615   def CheckPrereq(self):
4616     """Check prerequisites.
4617
4618     This checks that we have the given tag.
4619
4620     """
4621     TagsLU.CheckPrereq(self)
4622     for tag in self.op.tags:
4623       objects.TaggableObject.ValidateTag(tag)
4624     del_tags = frozenset(self.op.tags)
4625     cur_tags = self.target.GetTags()
4626     if not del_tags <= cur_tags:
4627       diff_tags = del_tags - cur_tags
4628       diff_names = ["'%s'" % tag for tag in diff_tags]
4629       diff_names.sort()
4630       raise errors.OpPrereqError("Tag(s) %s not found" %
4631                                  (",".join(diff_names)))
4632
4633   def Exec(self, feedback_fn):
4634     """Remove the tag from the object.
4635
4636     """
4637     for tag in self.op.tags:
4638       self.target.RemoveTag(tag)
4639     try:
4640       self.cfg.Update(self.target)
4641     except errors.ConfigurationError:
4642       raise errors.OpRetryError("There has been a modification to the"
4643                                 " config file and the operation has been"
4644                                 " aborted. Please retry.")
4645
4646 class LUTestDelay(NoHooksLU):
4647   """Sleep for a specified amount of time.
4648
4649   This LU sleeps on the master and/or nodes for a specified amoutn of
4650   time.
4651
4652   """
4653   _OP_REQP = ["duration", "on_master", "on_nodes"]
4654
4655   def CheckPrereq(self):
4656     """Check prerequisites.
4657
4658     This checks that we have a good list of nodes and/or the duration
4659     is valid.
4660
4661     """
4662
4663     if self.op.on_nodes:
4664       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4665
4666   def Exec(self, feedback_fn):
4667     """Do the actual sleep.
4668
4669     """
4670     if self.op.on_master:
4671       if not utils.TestDelay(self.op.duration):
4672         raise errors.OpExecError("Error during master delay test")
4673     if self.op.on_nodes:
4674       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4675       if not result:
4676         raise errors.OpExecError("Complete failure from rpc call")
4677       for node, node_result in result.items():
4678         if not node_result:
4679           raise errors.OpExecError("Failure during rpc call to node %s,"
4680                                    " result: %s" % (node, node_result))
4681
4682
4683 def _AllocatorGetClusterData(cfg, sstore):
4684   """Compute the generic allocator input data.
4685
4686   This is the data that is independent of the actual operation.
4687
4688   """
4689   # cluster data
4690   data = {
4691     "version": 1,
4692     "cluster_name": sstore.GetClusterName(),
4693     "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4694     # we don't have job IDs
4695     }
4696
4697   # node data
4698   node_results = {}
4699   node_list = cfg.GetNodeList()
4700   node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4701   for nname in node_list:
4702     ninfo = cfg.GetNodeInfo(nname)
4703     if nname not in node_data or not isinstance(node_data[nname], dict):
4704       raise errors.OpExecError("Can't get data for node %s" % nname)
4705     remote_info = node_data[nname]
4706     for attr in ['memory_total', 'memory_free',
4707                  'vg_size', 'vg_free']:
4708       if attr not in remote_info:
4709         raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4710                                  (nname, attr))
4711       try:
4712         int(remote_info[attr])
4713       except ValueError, err:
4714         raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4715                                  " %s" % (nname, attr, str(err)))
4716     pnr = {
4717       "tags": list(ninfo.GetTags()),
4718       "total_memory": utils.TryConvert(int, remote_info['memory_total']),
4719       "free_memory": utils.TryConvert(int, remote_info['memory_free']),
4720       "total_disk": utils.TryConvert(int, remote_info['vg_size']),
4721       "free_disk": utils.TryConvert(int, remote_info['vg_free']),
4722       "primary_ip": ninfo.primary_ip,
4723       "secondary_ip": ninfo.secondary_ip,
4724       }
4725     node_results[nname] = pnr
4726   data["nodes"] = node_results
4727
4728   # instance data
4729   instance_data = {}
4730   i_list = cfg.GetInstanceList()
4731   for iname in i_list:
4732     iinfo = cfg.GetInstanceInfo(iname)
4733     nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4734                 for n in iinfo.nics]
4735     pir = {
4736       "tags": list(iinfo.GetTags()),
4737       "should_run": iinfo.status == "up",
4738       "vcpus": iinfo.vcpus,
4739       "memory": iinfo.memory,
4740       "os": iinfo.os,
4741       "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4742       "nics": nic_data,
4743       "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4744       "disk_template": iinfo.disk_template,
4745       }
4746     instance_data[iname] = pir
4747
4748   data["instances"] = instance_data
4749
4750   return data
4751
4752
4753 def _AllocatorAddNewInstance(data, op):
4754   """Add new instance data to allocator structure.
4755
4756   This in combination with _AllocatorGetClusterData will create the
4757   correct structure needed as input for the allocator.
4758
4759   The checks for the completeness of the opcode must have already been
4760   done.
4761
4762   """
4763   request = {
4764     "type": "allocate",
4765     "name": op.name,
4766     "disk_template": op.disk_template,
4767     "tags": op.tags,
4768     "os": op.os,
4769     "vcpus": op.vcpus,
4770     "memory": op.mem_size,
4771     "disks": op.disks,
4772     "nics": op.nics,
4773     }
4774   data["request"] = request
4775
4776
4777 def _AllocatorAddRelocateInstance(data, op):
4778   """Add relocate instance data to allocator structure.
4779
4780   This in combination with _AllocatorGetClusterData will create the
4781   correct structure needed as input for the allocator.
4782
4783   The checks for the completeness of the opcode must have already been
4784   done.
4785
4786   """
4787   request = {
4788     "type": "replace_secondary",
4789     "name": op.name,
4790     }
4791   data["request"] = request
4792
4793
4794 class LUTestAllocator(NoHooksLU):
4795   """Run allocator tests.
4796
4797   This LU runs the allocator tests
4798
4799   """
4800   _OP_REQP = ["direction", "mode", "name"]
4801
4802   def CheckPrereq(self):
4803     """Check prerequisites.
4804
4805     This checks the opcode parameters depending on the director and mode test.
4806
4807     """
4808     if self.op.mode == constants.ALF_MODE_ALLOC:
4809       for attr in ["name", "mem_size", "disks", "disk_template",
4810                    "os", "tags", "nics", "vcpus"]:
4811         if not hasattr(self.op, attr):
4812           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4813                                      attr)
4814       iname = self.cfg.ExpandInstanceName(self.op.name)
4815       if iname is not None:
4816         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4817                                    iname)
4818       if not isinstance(self.op.nics, list):
4819         raise errors.OpPrereqError("Invalid parameter 'nics'")
4820       for row in self.op.nics:
4821         if (not isinstance(row, dict) or
4822             "mac" not in row or
4823             "ip" not in row or
4824             "bridge" not in row):
4825           raise errors.OpPrereqError("Invalid contents of the"
4826                                      " 'nics' parameter")
4827       if not isinstance(self.op.disks, list):
4828         raise errors.OpPrereqError("Invalid parameter 'disks'")
4829       for row in self.op.disks:
4830         if (not isinstance(row, dict) or
4831             "size" not in row or
4832             not isinstance(row["size"], int) or
4833             "mode" not in row or
4834             row["mode"] not in ['r', 'w']):
4835           raise errors.OpPrereqError("Invalid contents of the"
4836                                      " 'disks' parameter")
4837     elif self.op.mode == constants.ALF_MODE_RELOC:
4838       if not hasattr(self.op, "name"):
4839         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4840       fname = self.cfg.ExpandInstanceName(self.op.name)
4841       if fname is None:
4842         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4843                                    self.op.name)
4844       self.op.name = fname
4845     else:
4846       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4847                                  self.op.mode)
4848
4849     if self.op.direction == constants.ALF_DIR_OUT:
4850       if not hasattr(self.op, "allocator"):
4851         raise errors.OpPrereqError("Missing allocator name")
4852       raise errors.OpPrereqError("Allocator out mode not supported yet")
4853     elif self.op.direction != constants.ALF_DIR_IN:
4854       raise errors.OpPrereqError("Wrong allocator test '%s'" %
4855                                  self.op.direction)
4856
4857   def Exec(self, feedback_fn):
4858     """Run the allocator test.
4859
4860     """
4861     data = _AllocatorGetClusterData(self.cfg, self.sstore)
4862     if self.op.mode == constants.ALF_MODE_ALLOC:
4863       _AllocatorAddNewInstance(data, self.op)
4864     else:
4865       _AllocatorAddRelocateInstance(data, self.op)
4866
4867     if _JSON_INDENT is None:
4868       text = simplejson.dumps(data)
4869     else:
4870       text = simplejson.dumps(data, indent=_JSON_INDENT)
4871     return text