Verify: add more instance information to node_info
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import simplejson
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46
47
48 # Check whether the simplejson module supports indentation
49 _JSON_INDENT = 2
50 try:
51   simplejson.dumps(1, indent=_JSON_INDENT)
52 except TypeError:
53   _JSON_INDENT = None
54
55
56 class LogicalUnit(object):
57   """Logical Unit base class.
58
59   Subclasses must follow these rules:
60     - implement CheckPrereq which also fills in the opcode instance
61       with all the fields (even if as None)
62     - implement Exec
63     - implement BuildHooksEnv
64     - redefine HPATH and HTYPE
65     - optionally redefine their run requirements (REQ_CLUSTER,
66       REQ_MASTER); note that all commands require root permissions
67
68   """
69   HPATH = None
70   HTYPE = None
71   _OP_REQP = []
72   REQ_CLUSTER = True
73   REQ_MASTER = True
74
75   def __init__(self, processor, op, cfg, sstore):
76     """Constructor for LogicalUnit.
77
78     This needs to be overriden in derived classes in order to check op
79     validity.
80
81     """
82     self.proc = processor
83     self.op = op
84     self.cfg = cfg
85     self.sstore = sstore
86     for attr_name in self._OP_REQP:
87       attr_val = getattr(op, attr_name, None)
88       if attr_val is None:
89         raise errors.OpPrereqError("Required parameter '%s' missing" %
90                                    attr_name)
91     if self.REQ_CLUSTER:
92       if not cfg.IsCluster():
93         raise errors.OpPrereqError("Cluster not initialized yet,"
94                                    " use 'gnt-cluster init' first.")
95       if self.REQ_MASTER:
96         master = sstore.GetMasterNode()
97         if master != utils.HostInfo().name:
98           raise errors.OpPrereqError("Commands must be run on the master"
99                                      " node %s" % master)
100
101   def CheckPrereq(self):
102     """Check prerequisites for this LU.
103
104     This method should check that the prerequisites for the execution
105     of this LU are fulfilled. It can do internode communication, but
106     it should be idempotent - no cluster or system changes are
107     allowed.
108
109     The method should raise errors.OpPrereqError in case something is
110     not fulfilled. Its return value is ignored.
111
112     This method should also update all the parameters of the opcode to
113     their canonical form; e.g. a short node name must be fully
114     expanded after this method has successfully completed (so that
115     hooks, logging, etc. work correctly).
116
117     """
118     raise NotImplementedError
119
120   def Exec(self, feedback_fn):
121     """Execute the LU.
122
123     This method should implement the actual work. It should raise
124     errors.OpExecError for failures that are somewhat dealt with in
125     code, or expected.
126
127     """
128     raise NotImplementedError
129
130   def BuildHooksEnv(self):
131     """Build hooks environment for this LU.
132
133     This method should return a three-node tuple consisting of: a dict
134     containing the environment that will be used for running the
135     specific hook for this LU, a list of node names on which the hook
136     should run before the execution, and a list of node names on which
137     the hook should run after the execution.
138
139     The keys of the dict must not have 'GANETI_' prefixed as this will
140     be handled in the hooks runner. Also note additional keys will be
141     added by the hooks runner. If the LU doesn't define any
142     environment, an empty dict (and not None) should be returned.
143
144     As for the node lists, the master should not be included in the
145     them, as it will be added by the hooks runner in case this LU
146     requires a cluster to run on (otherwise we don't have a node
147     list). No nodes should be returned as an empty list (and not
148     None).
149
150     Note that if the HPATH for a LU class is None, this function will
151     not be called.
152
153     """
154     raise NotImplementedError
155
156
157 class NoHooksLU(LogicalUnit):
158   """Simple LU which runs no hooks.
159
160   This LU is intended as a parent for other LogicalUnits which will
161   run no hooks, in order to reduce duplicate code.
162
163   """
164   HPATH = None
165   HTYPE = None
166
167   def BuildHooksEnv(self):
168     """Build hooks env.
169
170     This is a no-op, since we don't run hooks.
171
172     """
173     return {}, [], []
174
175
176 def _AddHostToEtcHosts(hostname):
177   """Wrapper around utils.SetEtcHostsEntry.
178
179   """
180   hi = utils.HostInfo(name=hostname)
181   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
182
183
184 def _RemoveHostFromEtcHosts(hostname):
185   """Wrapper around utils.RemoveEtcHostsEntry.
186
187   """
188   hi = utils.HostInfo(name=hostname)
189   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
190   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
191
192
193 def _GetWantedNodes(lu, nodes):
194   """Returns list of checked and expanded node names.
195
196   Args:
197     nodes: List of nodes (strings) or None for all
198
199   """
200   if not isinstance(nodes, list):
201     raise errors.OpPrereqError("Invalid argument type 'nodes'")
202
203   if nodes:
204     wanted = []
205
206     for name in nodes:
207       node = lu.cfg.ExpandNodeName(name)
208       if node is None:
209         raise errors.OpPrereqError("No such node name '%s'" % name)
210       wanted.append(node)
211
212   else:
213     wanted = lu.cfg.GetNodeList()
214   return utils.NiceSort(wanted)
215
216
217 def _GetWantedInstances(lu, instances):
218   """Returns list of checked and expanded instance names.
219
220   Args:
221     instances: List of instances (strings) or None for all
222
223   """
224   if not isinstance(instances, list):
225     raise errors.OpPrereqError("Invalid argument type 'instances'")
226
227   if instances:
228     wanted = []
229
230     for name in instances:
231       instance = lu.cfg.ExpandInstanceName(name)
232       if instance is None:
233         raise errors.OpPrereqError("No such instance name '%s'" % name)
234       wanted.append(instance)
235
236   else:
237     wanted = lu.cfg.GetInstanceList()
238   return utils.NiceSort(wanted)
239
240
241 def _CheckOutputFields(static, dynamic, selected):
242   """Checks whether all selected fields are valid.
243
244   Args:
245     static: Static fields
246     dynamic: Dynamic fields
247
248   """
249   static_fields = frozenset(static)
250   dynamic_fields = frozenset(dynamic)
251
252   all_fields = static_fields | dynamic_fields
253
254   if not all_fields.issuperset(selected):
255     raise errors.OpPrereqError("Unknown output fields selected: %s"
256                                % ",".join(frozenset(selected).
257                                           difference(all_fields)))
258
259
260 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
261                           memory, vcpus, nics):
262   """Builds instance related env variables for hooks from single variables.
263
264   Args:
265     secondary_nodes: List of secondary nodes as strings
266   """
267   env = {
268     "OP_TARGET": name,
269     "INSTANCE_NAME": name,
270     "INSTANCE_PRIMARY": primary_node,
271     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
272     "INSTANCE_OS_TYPE": os_type,
273     "INSTANCE_STATUS": status,
274     "INSTANCE_MEMORY": memory,
275     "INSTANCE_VCPUS": vcpus,
276   }
277
278   if nics:
279     nic_count = len(nics)
280     for idx, (ip, bridge, mac) in enumerate(nics):
281       if ip is None:
282         ip = ""
283       env["INSTANCE_NIC%d_IP" % idx] = ip
284       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
285       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
286   else:
287     nic_count = 0
288
289   env["INSTANCE_NIC_COUNT"] = nic_count
290
291   return env
292
293
294 def _BuildInstanceHookEnvByObject(instance, override=None):
295   """Builds instance related env variables for hooks from an object.
296
297   Args:
298     instance: objects.Instance object of instance
299     override: dict of values to override
300   """
301   args = {
302     'name': instance.name,
303     'primary_node': instance.primary_node,
304     'secondary_nodes': instance.secondary_nodes,
305     'os_type': instance.os,
306     'status': instance.os,
307     'memory': instance.memory,
308     'vcpus': instance.vcpus,
309     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
310   }
311   if override:
312     args.update(override)
313   return _BuildInstanceHookEnv(**args)
314
315
316 def _UpdateKnownHosts(fullnode, ip, pubkey):
317   """Ensure a node has a correct known_hosts entry.
318
319   Args:
320     fullnode - Fully qualified domain name of host. (str)
321     ip       - IPv4 address of host (str)
322     pubkey   - the public key of the cluster
323
324   """
325   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
326     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
327   else:
328     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
329
330   inthere = False
331
332   save_lines = []
333   add_lines = []
334   removed = False
335
336   for rawline in f:
337     logger.Debug('read %s' % (repr(rawline),))
338
339     parts = rawline.rstrip('\r\n').split()
340
341     # Ignore unwanted lines
342     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
343       fields = parts[0].split(',')
344       key = parts[2]
345
346       haveall = True
347       havesome = False
348       for spec in [ ip, fullnode ]:
349         if spec not in fields:
350           haveall = False
351         if spec in fields:
352           havesome = True
353
354       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
355       if haveall and key == pubkey:
356         inthere = True
357         save_lines.append(rawline)
358         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
359         continue
360
361       if havesome and (not haveall or key != pubkey):
362         removed = True
363         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
364         continue
365
366     save_lines.append(rawline)
367
368   if not inthere:
369     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
370     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
371
372   if removed:
373     save_lines = save_lines + add_lines
374
375     # Write a new file and replace old.
376     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
377                                    constants.DATA_DIR)
378     newfile = os.fdopen(fd, 'w')
379     try:
380       newfile.write(''.join(save_lines))
381     finally:
382       newfile.close()
383     logger.Debug("Wrote new known_hosts.")
384     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
385
386   elif add_lines:
387     # Simply appending a new line will do the trick.
388     f.seek(0, 2)
389     for add in add_lines:
390       f.write(add)
391
392   f.close()
393
394
395 def _HasValidVG(vglist, vgname):
396   """Checks if the volume group list is valid.
397
398   A non-None return value means there's an error, and the return value
399   is the error message.
400
401   """
402   vgsize = vglist.get(vgname, None)
403   if vgsize is None:
404     return "volume group '%s' missing" % vgname
405   elif vgsize < 20480:
406     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
407             (vgname, vgsize))
408   return None
409
410
411 def _InitSSHSetup(node):
412   """Setup the SSH configuration for the cluster.
413
414
415   This generates a dsa keypair for root, adds the pub key to the
416   permitted hosts and adds the hostkey to its own known hosts.
417
418   Args:
419     node: the name of this host as a fqdn
420
421   """
422   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
423
424   for name in priv_key, pub_key:
425     if os.path.exists(name):
426       utils.CreateBackup(name)
427     utils.RemoveFile(name)
428
429   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
430                          "-f", priv_key,
431                          "-q", "-N", ""])
432   if result.failed:
433     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
434                              result.output)
435
436   f = open(pub_key, 'r')
437   try:
438     utils.AddAuthorizedKey(auth_keys, f.read(8192))
439   finally:
440     f.close()
441
442
443 def _InitGanetiServerSetup(ss):
444   """Setup the necessary configuration for the initial node daemon.
445
446   This creates the nodepass file containing the shared password for
447   the cluster and also generates the SSL certificate.
448
449   """
450   # Create pseudo random password
451   randpass = sha.new(os.urandom(64)).hexdigest()
452   # and write it into sstore
453   ss.SetKey(ss.SS_NODED_PASS, randpass)
454
455   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
456                          "-days", str(365*5), "-nodes", "-x509",
457                          "-keyout", constants.SSL_CERT_FILE,
458                          "-out", constants.SSL_CERT_FILE, "-batch"])
459   if result.failed:
460     raise errors.OpExecError("could not generate server ssl cert, command"
461                              " %s had exitcode %s and error message %s" %
462                              (result.cmd, result.exit_code, result.output))
463
464   os.chmod(constants.SSL_CERT_FILE, 0400)
465
466   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
467
468   if result.failed:
469     raise errors.OpExecError("Could not start the node daemon, command %s"
470                              " had exitcode %s and error %s" %
471                              (result.cmd, result.exit_code, result.output))
472
473
474 def _CheckInstanceBridgesExist(instance):
475   """Check that the brigdes needed by an instance exist.
476
477   """
478   # check bridges existance
479   brlist = [nic.bridge for nic in instance.nics]
480   if not rpc.call_bridges_exist(instance.primary_node, brlist):
481     raise errors.OpPrereqError("one or more target bridges %s does not"
482                                " exist on destination node '%s'" %
483                                (brlist, instance.primary_node))
484
485
486 class LUInitCluster(LogicalUnit):
487   """Initialise the cluster.
488
489   """
490   HPATH = "cluster-init"
491   HTYPE = constants.HTYPE_CLUSTER
492   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
493               "def_bridge", "master_netdev"]
494   REQ_CLUSTER = False
495
496   def BuildHooksEnv(self):
497     """Build hooks env.
498
499     Notes: Since we don't require a cluster, we must manually add
500     ourselves in the post-run node list.
501
502     """
503     env = {"OP_TARGET": self.op.cluster_name}
504     return env, [], [self.hostname.name]
505
506   def CheckPrereq(self):
507     """Verify that the passed name is a valid one.
508
509     """
510     if config.ConfigWriter.IsCluster():
511       raise errors.OpPrereqError("Cluster is already initialised")
512
513     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
514       if not os.path.exists(constants.VNC_PASSWORD_FILE):
515         raise errors.OpPrereqError("Please prepare the cluster VNC"
516                                    "password file %s" %
517                                    constants.VNC_PASSWORD_FILE)
518
519     self.hostname = hostname = utils.HostInfo()
520
521     if hostname.ip.startswith("127."):
522       raise errors.OpPrereqError("This host's IP resolves to the private"
523                                  " range (%s). Please fix DNS or %s." %
524                                  (hostname.ip, constants.ETC_HOSTS))
525
526     if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
527                          source=constants.LOCALHOST_IP_ADDRESS):
528       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
529                                  " to %s,\nbut this ip address does not"
530                                  " belong to this host."
531                                  " Aborting." % hostname.ip)
532
533     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
534
535     if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
536                      timeout=5):
537       raise errors.OpPrereqError("Cluster IP already active. Aborting.")
538
539     secondary_ip = getattr(self.op, "secondary_ip", None)
540     if secondary_ip and not utils.IsValidIP(secondary_ip):
541       raise errors.OpPrereqError("Invalid secondary ip given")
542     if (secondary_ip and
543         secondary_ip != hostname.ip and
544         (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
545                            source=constants.LOCALHOST_IP_ADDRESS))):
546       raise errors.OpPrereqError("You gave %s as secondary IP,"
547                                  " but it does not belong to this host." %
548                                  secondary_ip)
549     self.secondary_ip = secondary_ip
550
551     # checks presence of the volume group given
552     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
553
554     if vgstatus:
555       raise errors.OpPrereqError("Error: %s" % vgstatus)
556
557     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
558                     self.op.mac_prefix):
559       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
560                                  self.op.mac_prefix)
561
562     if self.op.hypervisor_type not in constants.HYPER_TYPES:
563       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
564                                  self.op.hypervisor_type)
565
566     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
567     if result.failed:
568       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
569                                  (self.op.master_netdev,
570                                   result.output.strip()))
571
572     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
573             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
574       raise errors.OpPrereqError("Init.d script '%s' missing or not"
575                                  " executable." % constants.NODE_INITD_SCRIPT)
576
577   def Exec(self, feedback_fn):
578     """Initialize the cluster.
579
580     """
581     clustername = self.clustername
582     hostname = self.hostname
583
584     # set up the simple store
585     self.sstore = ss = ssconf.SimpleStore()
586     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
587     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
588     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
589     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
590     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
591
592     # set up the inter-node password and certificate
593     _InitGanetiServerSetup(ss)
594
595     # start the master ip
596     rpc.call_node_start_master(hostname.name)
597
598     # set up ssh config and /etc/hosts
599     f = open(constants.SSH_HOST_RSA_PUB, 'r')
600     try:
601       sshline = f.read()
602     finally:
603       f.close()
604     sshkey = sshline.split(" ")[1]
605
606     _AddHostToEtcHosts(hostname.name)
607
608     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
609
610     _InitSSHSetup(hostname.name)
611
612     # init of cluster config file
613     self.cfg = cfgw = config.ConfigWriter()
614     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
615                     sshkey, self.op.mac_prefix,
616                     self.op.vg_name, self.op.def_bridge)
617
618
619 class LUDestroyCluster(NoHooksLU):
620   """Logical unit for destroying the cluster.
621
622   """
623   _OP_REQP = []
624
625   def CheckPrereq(self):
626     """Check prerequisites.
627
628     This checks whether the cluster is empty.
629
630     Any errors are signalled by raising errors.OpPrereqError.
631
632     """
633     master = self.sstore.GetMasterNode()
634
635     nodelist = self.cfg.GetNodeList()
636     if len(nodelist) != 1 or nodelist[0] != master:
637       raise errors.OpPrereqError("There are still %d node(s) in"
638                                  " this cluster." % (len(nodelist) - 1))
639     instancelist = self.cfg.GetInstanceList()
640     if instancelist:
641       raise errors.OpPrereqError("There are still %d instance(s) in"
642                                  " this cluster." % len(instancelist))
643
644   def Exec(self, feedback_fn):
645     """Destroys the cluster.
646
647     """
648     master = self.sstore.GetMasterNode()
649     if not rpc.call_node_stop_master(master):
650       raise errors.OpExecError("Could not disable the master role")
651     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
652     utils.CreateBackup(priv_key)
653     utils.CreateBackup(pub_key)
654     rpc.call_node_leave_cluster(master)
655
656
657 class LUVerifyCluster(NoHooksLU):
658   """Verifies the cluster status.
659
660   """
661   _OP_REQP = []
662
663   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664                   remote_version, feedback_fn):
665     """Run multiple tests against a node.
666
667     Test list:
668       - compares ganeti version
669       - checks vg existance and size > 20G
670       - checks config file checksum
671       - checks ssh to other nodes
672
673     Args:
674       node: name of the node to check
675       file_list: required list of files
676       local_cksum: dictionary of local files and their checksums
677
678     """
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     if not remote_version:
682       feedback_fn("  - ERROR: connection to %s failed" % (node))
683       return True
684
685     if local_version != remote_version:
686       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
687                       (local_version, node, remote_version))
688       return True
689
690     # checks vg existance and size > 20G
691
692     bad = False
693     if not vglist:
694       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
695                       (node,))
696       bad = True
697     else:
698       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
699       if vgstatus:
700         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
701         bad = True
702
703     # checks config file checksum
704     # checks ssh to any
705
706     if 'filelist' not in node_result:
707       bad = True
708       feedback_fn("  - ERROR: node hasn't returned file checksum data")
709     else:
710       remote_cksum = node_result['filelist']
711       for file_name in file_list:
712         if file_name not in remote_cksum:
713           bad = True
714           feedback_fn("  - ERROR: file '%s' missing" % file_name)
715         elif remote_cksum[file_name] != local_cksum[file_name]:
716           bad = True
717           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
718
719     if 'nodelist' not in node_result:
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
722     else:
723       if node_result['nodelist']:
724         bad = True
725         for node in node_result['nodelist']:
726           feedback_fn("  - ERROR: communication with node '%s': %s" %
727                           (node, node_result['nodelist'][node]))
728     hyp_result = node_result.get('hypervisor', None)
729     if hyp_result is not None:
730       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
731     return bad
732
733   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
734                       node_instance, feedback_fn):
735     """Verify an instance.
736
737     This function checks to see if the required block devices are
738     available on the instance's node.
739
740     """
741     bad = False
742
743     node_current = instanceconfig.primary_node
744
745     node_vol_should = {}
746     instanceconfig.MapLVsByNode(node_vol_should)
747
748     for node in node_vol_should:
749       for volume in node_vol_should[node]:
750         if node not in node_vol_is or volume not in node_vol_is[node]:
751           feedback_fn("  - ERROR: volume %s missing on node %s" %
752                           (volume, node))
753           bad = True
754
755     if not instanceconfig.status == 'down':
756       if (node_current not in node_instance or
757           not instance in node_instance[node_current]):
758         feedback_fn("  - ERROR: instance %s not running on node %s" %
759                         (instance, node_current))
760         bad = True
761
762     for node in node_instance:
763       if (not node == node_current):
764         if instance in node_instance[node]:
765           feedback_fn("  - ERROR: instance %s should not run on node %s" %
766                           (instance, node))
767           bad = True
768
769     return bad
770
771   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
772     """Verify if there are any unknown volumes in the cluster.
773
774     The .os, .swap and backup volumes are ignored. All other volumes are
775     reported as unknown.
776
777     """
778     bad = False
779
780     for node in node_vol_is:
781       for volume in node_vol_is[node]:
782         if node not in node_vol_should or volume not in node_vol_should[node]:
783           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
784                       (volume, node))
785           bad = True
786     return bad
787
788   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
789     """Verify the list of running instances.
790
791     This checks what instances are running but unknown to the cluster.
792
793     """
794     bad = False
795     for node in node_instance:
796       for runninginstance in node_instance[node]:
797         if runninginstance not in instancelist:
798           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
799                           (runninginstance, node))
800           bad = True
801     return bad
802
803   def CheckPrereq(self):
804     """Check prerequisites.
805
806     This has no prerequisites.
807
808     """
809     pass
810
811   def Exec(self, feedback_fn):
812     """Verify integrity of cluster, performing various test on nodes.
813
814     """
815     bad = False
816     feedback_fn("* Verifying global settings")
817     for msg in self.cfg.VerifyConfig():
818       feedback_fn("  - ERROR: %s" % msg)
819
820     vg_name = self.cfg.GetVGName()
821     nodelist = utils.NiceSort(self.cfg.GetNodeList())
822     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
823     i_non_redundant = [] # Non redundant instances
824     node_volume = {}
825     node_instance = {}
826     node_info = {}
827
828     # FIXME: verify OS list
829     # do local checksums
830     file_names = list(self.sstore.GetFileList())
831     file_names.append(constants.SSL_CERT_FILE)
832     file_names.append(constants.CLUSTER_CONF_FILE)
833     local_checksums = utils.FingerprintFiles(file_names)
834
835     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
836     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
837     all_instanceinfo = rpc.call_instance_list(nodelist)
838     all_vglist = rpc.call_vg_list(nodelist)
839     node_verify_param = {
840       'filelist': file_names,
841       'nodelist': nodelist,
842       'hypervisor': None,
843       }
844     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
845     all_rversion = rpc.call_version(nodelist)
846     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
847
848     for node in nodelist:
849       feedback_fn("* Verifying node %s" % node)
850       result = self._VerifyNode(node, file_names, local_checksums,
851                                 all_vglist[node], all_nvinfo[node],
852                                 all_rversion[node], feedback_fn)
853       bad = bad or result
854
855       # node_volume
856       volumeinfo = all_volumeinfo[node]
857
858       if isinstance(volumeinfo, basestring):
859         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
860                     (node, volumeinfo[-400:].encode('string_escape')))
861         bad = True
862         node_volume[node] = {}
863       elif not isinstance(volumeinfo, dict):
864         feedback_fn("  - ERROR: connection to %s failed" % (node,))
865         bad = True
866         continue
867       else:
868         node_volume[node] = volumeinfo
869
870       # node_instance
871       nodeinstance = all_instanceinfo[node]
872       if type(nodeinstance) != list:
873         feedback_fn("  - ERROR: connection to %s failed" % (node,))
874         bad = True
875         continue
876
877       node_instance[node] = nodeinstance
878
879       # node_info
880       nodeinfo = all_ninfo[node]
881       if not isinstance(nodeinfo, dict):
882         feedback_fn("  - ERROR: connection to %s failed" % (node,))
883         bad = True
884         continue
885
886       try:
887         node_info[node] = {
888           "mfree": int(nodeinfo['memory_free']),
889           "dfree": int(nodeinfo['vg_free']),
890           "pinst": [],
891           "sinst": [],
892           # dictionary holding all instances this node is secondary for,
893           # grouped by their primary node. Each key is a cluster node, and each
894           # value is a list of instances which have the key as primary and the
895           # current node as secondary.  this is handy to calculate N+1 memory
896           # availability if you can only failover from a primary to its
897           # secondary.
898           "sinst-by-pnode": {},
899         }
900       except ValueError:
901         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
902         bad = True
903         continue
904
905     node_vol_should = {}
906
907     for instance in instancelist:
908       feedback_fn("* Verifying instance %s" % instance)
909       inst_config = self.cfg.GetInstanceInfo(instance)
910       result =  self._VerifyInstance(instance, inst_config, node_volume,
911                                      node_instance, feedback_fn)
912       bad = bad or result
913
914       inst_config.MapLVsByNode(node_vol_should)
915
916       pnode = inst_config.primary_node
917       if pnode in node_info:
918         node_info[pnode]['pinst'].append(instance)
919       else:
920         feedback_fn("  - ERROR: instance %s, connection to primary node"
921                     " %s failed" % (instance, pnode))
922         bad = True
923
924       # If the instance is non-redundant we cannot survive losing its primary
925       # node, so we are not N+1 compliant. On the other hand we have no disk
926       # templates with more than one secondary so that situation is not well
927       # supported either.
928       # FIXME: does not support file-backed instances
929       if len(inst_config.secondary_nodes) == 0:
930         i_non_redundant.append(instance)
931       elif len(inst_config.secondary_nodes) > 1:
932         feedback_fn("  - WARNING: multiple secondaries for instance %s"
933                     % instance)
934
935       for snode in inst_config.secondary_nodes:
936         if snode in node_info:
937           node_info[snode]['sinst'].append(instance)
938           if pnode not in node_info[snode]['sinst-by-pnode']:
939             node_info[snode]['sinst-by-pnode'][pnode] = []
940           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
941         else:
942           feedback_fn("  - ERROR: instance %s, connection to secondary node"
943                       " %s failed" % (instance, snode))
944
945     feedback_fn("* Verifying orphan volumes")
946     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
947                                        feedback_fn)
948     bad = bad or result
949
950     feedback_fn("* Verifying remaining instances")
951     result = self._VerifyOrphanInstances(instancelist, node_instance,
952                                          feedback_fn)
953     bad = bad or result
954
955     return int(bad)
956
957
958 class LUVerifyDisks(NoHooksLU):
959   """Verifies the cluster disks status.
960
961   """
962   _OP_REQP = []
963
964   def CheckPrereq(self):
965     """Check prerequisites.
966
967     This has no prerequisites.
968
969     """
970     pass
971
972   def Exec(self, feedback_fn):
973     """Verify integrity of cluster disks.
974
975     """
976     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
977
978     vg_name = self.cfg.GetVGName()
979     nodes = utils.NiceSort(self.cfg.GetNodeList())
980     instances = [self.cfg.GetInstanceInfo(name)
981                  for name in self.cfg.GetInstanceList()]
982
983     nv_dict = {}
984     for inst in instances:
985       inst_lvs = {}
986       if (inst.status != "up" or
987           inst.disk_template not in constants.DTS_NET_MIRROR):
988         continue
989       inst.MapLVsByNode(inst_lvs)
990       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
991       for node, vol_list in inst_lvs.iteritems():
992         for vol in vol_list:
993           nv_dict[(node, vol)] = inst
994
995     if not nv_dict:
996       return result
997
998     node_lvs = rpc.call_volume_list(nodes, vg_name)
999
1000     to_act = set()
1001     for node in nodes:
1002       # node_volume
1003       lvs = node_lvs[node]
1004
1005       if isinstance(lvs, basestring):
1006         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1007         res_nlvm[node] = lvs
1008       elif not isinstance(lvs, dict):
1009         logger.Info("connection to node %s failed or invalid data returned" %
1010                     (node,))
1011         res_nodes.append(node)
1012         continue
1013
1014       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1015         inst = nv_dict.pop((node, lv_name), None)
1016         if (not lv_online and inst is not None
1017             and inst.name not in res_instances):
1018           res_instances.append(inst.name)
1019
1020     # any leftover items in nv_dict are missing LVs, let's arrange the
1021     # data better
1022     for key, inst in nv_dict.iteritems():
1023       if inst.name not in res_missing:
1024         res_missing[inst.name] = []
1025       res_missing[inst.name].append(key)
1026
1027     return result
1028
1029
1030 class LURenameCluster(LogicalUnit):
1031   """Rename the cluster.
1032
1033   """
1034   HPATH = "cluster-rename"
1035   HTYPE = constants.HTYPE_CLUSTER
1036   _OP_REQP = ["name"]
1037
1038   def BuildHooksEnv(self):
1039     """Build hooks env.
1040
1041     """
1042     env = {
1043       "OP_TARGET": self.sstore.GetClusterName(),
1044       "NEW_NAME": self.op.name,
1045       }
1046     mn = self.sstore.GetMasterNode()
1047     return env, [mn], [mn]
1048
1049   def CheckPrereq(self):
1050     """Verify that the passed name is a valid one.
1051
1052     """
1053     hostname = utils.HostInfo(self.op.name)
1054
1055     new_name = hostname.name
1056     self.ip = new_ip = hostname.ip
1057     old_name = self.sstore.GetClusterName()
1058     old_ip = self.sstore.GetMasterIP()
1059     if new_name == old_name and new_ip == old_ip:
1060       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1061                                  " cluster has changed")
1062     if new_ip != old_ip:
1063       result = utils.RunCmd(["fping", "-q", new_ip])
1064       if not result.failed:
1065         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1066                                    " reachable on the network. Aborting." %
1067                                    new_ip)
1068
1069     self.op.name = new_name
1070
1071   def Exec(self, feedback_fn):
1072     """Rename the cluster.
1073
1074     """
1075     clustername = self.op.name
1076     ip = self.ip
1077     ss = self.sstore
1078
1079     # shutdown the master IP
1080     master = ss.GetMasterNode()
1081     if not rpc.call_node_stop_master(master):
1082       raise errors.OpExecError("Could not disable the master role")
1083
1084     try:
1085       # modify the sstore
1086       ss.SetKey(ss.SS_MASTER_IP, ip)
1087       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1088
1089       # Distribute updated ss config to all nodes
1090       myself = self.cfg.GetNodeInfo(master)
1091       dist_nodes = self.cfg.GetNodeList()
1092       if myself.name in dist_nodes:
1093         dist_nodes.remove(myself.name)
1094
1095       logger.Debug("Copying updated ssconf data to all nodes")
1096       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1097         fname = ss.KeyToFilename(keyname)
1098         result = rpc.call_upload_file(dist_nodes, fname)
1099         for to_node in dist_nodes:
1100           if not result[to_node]:
1101             logger.Error("copy of file %s to node %s failed" %
1102                          (fname, to_node))
1103     finally:
1104       if not rpc.call_node_start_master(master):
1105         logger.Error("Could not re-enable the master role on the master,"
1106                      " please restart manually.")
1107
1108
1109 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1110   """Sleep and poll for an instance's disk to sync.
1111
1112   """
1113   if not instance.disks:
1114     return True
1115
1116   if not oneshot:
1117     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1118
1119   node = instance.primary_node
1120
1121   for dev in instance.disks:
1122     cfgw.SetDiskID(dev, node)
1123
1124   retries = 0
1125   while True:
1126     max_time = 0
1127     done = True
1128     cumul_degraded = False
1129     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1130     if not rstats:
1131       proc.LogWarning("Can't get any data from node %s" % node)
1132       retries += 1
1133       if retries >= 10:
1134         raise errors.RemoteError("Can't contact node %s for mirror data,"
1135                                  " aborting." % node)
1136       time.sleep(6)
1137       continue
1138     retries = 0
1139     for i in range(len(rstats)):
1140       mstat = rstats[i]
1141       if mstat is None:
1142         proc.LogWarning("Can't compute data for node %s/%s" %
1143                         (node, instance.disks[i].iv_name))
1144         continue
1145       # we ignore the ldisk parameter
1146       perc_done, est_time, is_degraded, _ = mstat
1147       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1148       if perc_done is not None:
1149         done = False
1150         if est_time is not None:
1151           rem_time = "%d estimated seconds remaining" % est_time
1152           max_time = est_time
1153         else:
1154           rem_time = "no time estimate"
1155         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1156                      (instance.disks[i].iv_name, perc_done, rem_time))
1157     if done or oneshot:
1158       break
1159
1160     if unlock:
1161       utils.Unlock('cmd')
1162     try:
1163       time.sleep(min(60, max_time))
1164     finally:
1165       if unlock:
1166         utils.Lock('cmd')
1167
1168   if done:
1169     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1170   return not cumul_degraded
1171
1172
1173 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1174   """Check that mirrors are not degraded.
1175
1176   The ldisk parameter, if True, will change the test from the
1177   is_degraded attribute (which represents overall non-ok status for
1178   the device(s)) to the ldisk (representing the local storage status).
1179
1180   """
1181   cfgw.SetDiskID(dev, node)
1182   if ldisk:
1183     idx = 6
1184   else:
1185     idx = 5
1186
1187   result = True
1188   if on_primary or dev.AssembleOnSecondary():
1189     rstats = rpc.call_blockdev_find(node, dev)
1190     if not rstats:
1191       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1192       result = False
1193     else:
1194       result = result and (not rstats[idx])
1195   if dev.children:
1196     for child in dev.children:
1197       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1198
1199   return result
1200
1201
1202 class LUDiagnoseOS(NoHooksLU):
1203   """Logical unit for OS diagnose/query.
1204
1205   """
1206   _OP_REQP = []
1207
1208   def CheckPrereq(self):
1209     """Check prerequisites.
1210
1211     This always succeeds, since this is a pure query LU.
1212
1213     """
1214     return
1215
1216   def Exec(self, feedback_fn):
1217     """Compute the list of OSes.
1218
1219     """
1220     node_list = self.cfg.GetNodeList()
1221     node_data = rpc.call_os_diagnose(node_list)
1222     if node_data == False:
1223       raise errors.OpExecError("Can't gather the list of OSes")
1224     return node_data
1225
1226
1227 class LURemoveNode(LogicalUnit):
1228   """Logical unit for removing a node.
1229
1230   """
1231   HPATH = "node-remove"
1232   HTYPE = constants.HTYPE_NODE
1233   _OP_REQP = ["node_name"]
1234
1235   def BuildHooksEnv(self):
1236     """Build hooks env.
1237
1238     This doesn't run on the target node in the pre phase as a failed
1239     node would not allows itself to run.
1240
1241     """
1242     env = {
1243       "OP_TARGET": self.op.node_name,
1244       "NODE_NAME": self.op.node_name,
1245       }
1246     all_nodes = self.cfg.GetNodeList()
1247     all_nodes.remove(self.op.node_name)
1248     return env, all_nodes, all_nodes
1249
1250   def CheckPrereq(self):
1251     """Check prerequisites.
1252
1253     This checks:
1254      - the node exists in the configuration
1255      - it does not have primary or secondary instances
1256      - it's not the master
1257
1258     Any errors are signalled by raising errors.OpPrereqError.
1259
1260     """
1261     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1262     if node is None:
1263       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1264
1265     instance_list = self.cfg.GetInstanceList()
1266
1267     masternode = self.sstore.GetMasterNode()
1268     if node.name == masternode:
1269       raise errors.OpPrereqError("Node is the master node,"
1270                                  " you need to failover first.")
1271
1272     for instance_name in instance_list:
1273       instance = self.cfg.GetInstanceInfo(instance_name)
1274       if node.name == instance.primary_node:
1275         raise errors.OpPrereqError("Instance %s still running on the node,"
1276                                    " please remove first." % instance_name)
1277       if node.name in instance.secondary_nodes:
1278         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1279                                    " please remove first." % instance_name)
1280     self.op.node_name = node.name
1281     self.node = node
1282
1283   def Exec(self, feedback_fn):
1284     """Removes the node from the cluster.
1285
1286     """
1287     node = self.node
1288     logger.Info("stopping the node daemon and removing configs from node %s" %
1289                 node.name)
1290
1291     rpc.call_node_leave_cluster(node.name)
1292
1293     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1294
1295     logger.Info("Removing node %s from config" % node.name)
1296
1297     self.cfg.RemoveNode(node.name)
1298
1299     _RemoveHostFromEtcHosts(node.name)
1300
1301
1302 class LUQueryNodes(NoHooksLU):
1303   """Logical unit for querying nodes.
1304
1305   """
1306   _OP_REQP = ["output_fields", "names"]
1307
1308   def CheckPrereq(self):
1309     """Check prerequisites.
1310
1311     This checks that the fields required are valid output fields.
1312
1313     """
1314     self.dynamic_fields = frozenset(["dtotal", "dfree",
1315                                      "mtotal", "mnode", "mfree",
1316                                      "bootid"])
1317
1318     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1319                                "pinst_list", "sinst_list",
1320                                "pip", "sip"],
1321                        dynamic=self.dynamic_fields,
1322                        selected=self.op.output_fields)
1323
1324     self.wanted = _GetWantedNodes(self, self.op.names)
1325
1326   def Exec(self, feedback_fn):
1327     """Computes the list of nodes and their attributes.
1328
1329     """
1330     nodenames = self.wanted
1331     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1332
1333     # begin data gathering
1334
1335     if self.dynamic_fields.intersection(self.op.output_fields):
1336       live_data = {}
1337       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1338       for name in nodenames:
1339         nodeinfo = node_data.get(name, None)
1340         if nodeinfo:
1341           live_data[name] = {
1342             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1343             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1344             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1345             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1346             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1347             "bootid": nodeinfo['bootid'],
1348             }
1349         else:
1350           live_data[name] = {}
1351     else:
1352       live_data = dict.fromkeys(nodenames, {})
1353
1354     node_to_primary = dict([(name, set()) for name in nodenames])
1355     node_to_secondary = dict([(name, set()) for name in nodenames])
1356
1357     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1358                              "sinst_cnt", "sinst_list"))
1359     if inst_fields & frozenset(self.op.output_fields):
1360       instancelist = self.cfg.GetInstanceList()
1361
1362       for instance_name in instancelist:
1363         inst = self.cfg.GetInstanceInfo(instance_name)
1364         if inst.primary_node in node_to_primary:
1365           node_to_primary[inst.primary_node].add(inst.name)
1366         for secnode in inst.secondary_nodes:
1367           if secnode in node_to_secondary:
1368             node_to_secondary[secnode].add(inst.name)
1369
1370     # end data gathering
1371
1372     output = []
1373     for node in nodelist:
1374       node_output = []
1375       for field in self.op.output_fields:
1376         if field == "name":
1377           val = node.name
1378         elif field == "pinst_list":
1379           val = list(node_to_primary[node.name])
1380         elif field == "sinst_list":
1381           val = list(node_to_secondary[node.name])
1382         elif field == "pinst_cnt":
1383           val = len(node_to_primary[node.name])
1384         elif field == "sinst_cnt":
1385           val = len(node_to_secondary[node.name])
1386         elif field == "pip":
1387           val = node.primary_ip
1388         elif field == "sip":
1389           val = node.secondary_ip
1390         elif field in self.dynamic_fields:
1391           val = live_data[node.name].get(field, None)
1392         else:
1393           raise errors.ParameterError(field)
1394         node_output.append(val)
1395       output.append(node_output)
1396
1397     return output
1398
1399
1400 class LUQueryNodeVolumes(NoHooksLU):
1401   """Logical unit for getting volumes on node(s).
1402
1403   """
1404   _OP_REQP = ["nodes", "output_fields"]
1405
1406   def CheckPrereq(self):
1407     """Check prerequisites.
1408
1409     This checks that the fields required are valid output fields.
1410
1411     """
1412     self.nodes = _GetWantedNodes(self, self.op.nodes)
1413
1414     _CheckOutputFields(static=["node"],
1415                        dynamic=["phys", "vg", "name", "size", "instance"],
1416                        selected=self.op.output_fields)
1417
1418
1419   def Exec(self, feedback_fn):
1420     """Computes the list of nodes and their attributes.
1421
1422     """
1423     nodenames = self.nodes
1424     volumes = rpc.call_node_volumes(nodenames)
1425
1426     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1427              in self.cfg.GetInstanceList()]
1428
1429     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1430
1431     output = []
1432     for node in nodenames:
1433       if node not in volumes or not volumes[node]:
1434         continue
1435
1436       node_vols = volumes[node][:]
1437       node_vols.sort(key=lambda vol: vol['dev'])
1438
1439       for vol in node_vols:
1440         node_output = []
1441         for field in self.op.output_fields:
1442           if field == "node":
1443             val = node
1444           elif field == "phys":
1445             val = vol['dev']
1446           elif field == "vg":
1447             val = vol['vg']
1448           elif field == "name":
1449             val = vol['name']
1450           elif field == "size":
1451             val = int(float(vol['size']))
1452           elif field == "instance":
1453             for inst in ilist:
1454               if node not in lv_by_node[inst]:
1455                 continue
1456               if vol['name'] in lv_by_node[inst][node]:
1457                 val = inst.name
1458                 break
1459             else:
1460               val = '-'
1461           else:
1462             raise errors.ParameterError(field)
1463           node_output.append(str(val))
1464
1465         output.append(node_output)
1466
1467     return output
1468
1469
1470 class LUAddNode(LogicalUnit):
1471   """Logical unit for adding node to the cluster.
1472
1473   """
1474   HPATH = "node-add"
1475   HTYPE = constants.HTYPE_NODE
1476   _OP_REQP = ["node_name"]
1477
1478   def BuildHooksEnv(self):
1479     """Build hooks env.
1480
1481     This will run on all nodes before, and on all nodes + the new node after.
1482
1483     """
1484     env = {
1485       "OP_TARGET": self.op.node_name,
1486       "NODE_NAME": self.op.node_name,
1487       "NODE_PIP": self.op.primary_ip,
1488       "NODE_SIP": self.op.secondary_ip,
1489       }
1490     nodes_0 = self.cfg.GetNodeList()
1491     nodes_1 = nodes_0 + [self.op.node_name, ]
1492     return env, nodes_0, nodes_1
1493
1494   def CheckPrereq(self):
1495     """Check prerequisites.
1496
1497     This checks:
1498      - the new node is not already in the config
1499      - it is resolvable
1500      - its parameters (single/dual homed) matches the cluster
1501
1502     Any errors are signalled by raising errors.OpPrereqError.
1503
1504     """
1505     node_name = self.op.node_name
1506     cfg = self.cfg
1507
1508     dns_data = utils.HostInfo(node_name)
1509
1510     node = dns_data.name
1511     primary_ip = self.op.primary_ip = dns_data.ip
1512     secondary_ip = getattr(self.op, "secondary_ip", None)
1513     if secondary_ip is None:
1514       secondary_ip = primary_ip
1515     if not utils.IsValidIP(secondary_ip):
1516       raise errors.OpPrereqError("Invalid secondary IP given")
1517     self.op.secondary_ip = secondary_ip
1518     node_list = cfg.GetNodeList()
1519     if node in node_list:
1520       raise errors.OpPrereqError("Node %s is already in the configuration"
1521                                  % node)
1522
1523     for existing_node_name in node_list:
1524       existing_node = cfg.GetNodeInfo(existing_node_name)
1525       if (existing_node.primary_ip == primary_ip or
1526           existing_node.secondary_ip == primary_ip or
1527           existing_node.primary_ip == secondary_ip or
1528           existing_node.secondary_ip == secondary_ip):
1529         raise errors.OpPrereqError("New node ip address(es) conflict with"
1530                                    " existing node %s" % existing_node.name)
1531
1532     # check that the type of the node (single versus dual homed) is the
1533     # same as for the master
1534     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1535     master_singlehomed = myself.secondary_ip == myself.primary_ip
1536     newbie_singlehomed = secondary_ip == primary_ip
1537     if master_singlehomed != newbie_singlehomed:
1538       if master_singlehomed:
1539         raise errors.OpPrereqError("The master has no private ip but the"
1540                                    " new node has one")
1541       else:
1542         raise errors.OpPrereqError("The master has a private ip but the"
1543                                    " new node doesn't have one")
1544
1545     # checks reachablity
1546     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1547       raise errors.OpPrereqError("Node not reachable by ping")
1548
1549     if not newbie_singlehomed:
1550       # check reachability from my secondary ip to newbie's secondary ip
1551       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1552                            source=myself.secondary_ip):
1553         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1554                                    " based ping to noded port")
1555
1556     self.new_node = objects.Node(name=node,
1557                                  primary_ip=primary_ip,
1558                                  secondary_ip=secondary_ip)
1559
1560     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1561       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1562         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1563                                    constants.VNC_PASSWORD_FILE)
1564
1565   def Exec(self, feedback_fn):
1566     """Adds the new node to the cluster.
1567
1568     """
1569     new_node = self.new_node
1570     node = new_node.name
1571
1572     # set up inter-node password and certificate and restarts the node daemon
1573     gntpass = self.sstore.GetNodeDaemonPassword()
1574     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1575       raise errors.OpExecError("ganeti password corruption detected")
1576     f = open(constants.SSL_CERT_FILE)
1577     try:
1578       gntpem = f.read(8192)
1579     finally:
1580       f.close()
1581     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1582     # so we use this to detect an invalid certificate; as long as the
1583     # cert doesn't contain this, the here-document will be correctly
1584     # parsed by the shell sequence below
1585     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1586       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1587     if not gntpem.endswith("\n"):
1588       raise errors.OpExecError("PEM must end with newline")
1589     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1590
1591     # and then connect with ssh to set password and start ganeti-noded
1592     # note that all the below variables are sanitized at this point,
1593     # either by being constants or by the checks above
1594     ss = self.sstore
1595     mycommand = ("umask 077 && "
1596                  "echo '%s' > '%s' && "
1597                  "cat > '%s' << '!EOF.' && \n"
1598                  "%s!EOF.\n%s restart" %
1599                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1600                   constants.SSL_CERT_FILE, gntpem,
1601                   constants.NODE_INITD_SCRIPT))
1602
1603     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1604     if result.failed:
1605       raise errors.OpExecError("Remote command on node %s, error: %s,"
1606                                " output: %s" %
1607                                (node, result.fail_reason, result.output))
1608
1609     # check connectivity
1610     time.sleep(4)
1611
1612     result = rpc.call_version([node])[node]
1613     if result:
1614       if constants.PROTOCOL_VERSION == result:
1615         logger.Info("communication to node %s fine, sw version %s match" %
1616                     (node, result))
1617       else:
1618         raise errors.OpExecError("Version mismatch master version %s,"
1619                                  " node version %s" %
1620                                  (constants.PROTOCOL_VERSION, result))
1621     else:
1622       raise errors.OpExecError("Cannot get version from the new node")
1623
1624     # setup ssh on node
1625     logger.Info("copy ssh key to node %s" % node)
1626     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1627     keyarray = []
1628     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1629                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1630                 priv_key, pub_key]
1631
1632     for i in keyfiles:
1633       f = open(i, 'r')
1634       try:
1635         keyarray.append(f.read())
1636       finally:
1637         f.close()
1638
1639     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1640                                keyarray[3], keyarray[4], keyarray[5])
1641
1642     if not result:
1643       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1644
1645     # Add node to our /etc/hosts, and add key to known_hosts
1646     _AddHostToEtcHosts(new_node.name)
1647
1648     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1649                       self.cfg.GetHostKey())
1650
1651     if new_node.secondary_ip != new_node.primary_ip:
1652       if not rpc.call_node_tcp_ping(new_node.name,
1653                                     constants.LOCALHOST_IP_ADDRESS,
1654                                     new_node.secondary_ip,
1655                                     constants.DEFAULT_NODED_PORT,
1656                                     10, False):
1657         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1658                                  " you gave (%s). Please fix and re-run this"
1659                                  " command." % new_node.secondary_ip)
1660
1661     success, msg = ssh.VerifyNodeHostname(node)
1662     if not success:
1663       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1664                                " than the one the resolver gives: %s."
1665                                " Please fix and re-run this command." %
1666                                (node, msg))
1667
1668     # Distribute updated /etc/hosts and known_hosts to all nodes,
1669     # including the node just added
1670     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1671     dist_nodes = self.cfg.GetNodeList() + [node]
1672     if myself.name in dist_nodes:
1673       dist_nodes.remove(myself.name)
1674
1675     logger.Debug("Copying hosts and known_hosts to all nodes")
1676     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1677       result = rpc.call_upload_file(dist_nodes, fname)
1678       for to_node in dist_nodes:
1679         if not result[to_node]:
1680           logger.Error("copy of file %s to node %s failed" %
1681                        (fname, to_node))
1682
1683     to_copy = ss.GetFileList()
1684     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1685       to_copy.append(constants.VNC_PASSWORD_FILE)
1686     for fname in to_copy:
1687       if not ssh.CopyFileToNode(node, fname):
1688         logger.Error("could not copy file %s to node %s" % (fname, node))
1689
1690     logger.Info("adding node %s to cluster.conf" % node)
1691     self.cfg.AddNode(new_node)
1692
1693
1694 class LUMasterFailover(LogicalUnit):
1695   """Failover the master node to the current node.
1696
1697   This is a special LU in that it must run on a non-master node.
1698
1699   """
1700   HPATH = "master-failover"
1701   HTYPE = constants.HTYPE_CLUSTER
1702   REQ_MASTER = False
1703   _OP_REQP = []
1704
1705   def BuildHooksEnv(self):
1706     """Build hooks env.
1707
1708     This will run on the new master only in the pre phase, and on all
1709     the nodes in the post phase.
1710
1711     """
1712     env = {
1713       "OP_TARGET": self.new_master,
1714       "NEW_MASTER": self.new_master,
1715       "OLD_MASTER": self.old_master,
1716       }
1717     return env, [self.new_master], self.cfg.GetNodeList()
1718
1719   def CheckPrereq(self):
1720     """Check prerequisites.
1721
1722     This checks that we are not already the master.
1723
1724     """
1725     self.new_master = utils.HostInfo().name
1726     self.old_master = self.sstore.GetMasterNode()
1727
1728     if self.old_master == self.new_master:
1729       raise errors.OpPrereqError("This commands must be run on the node"
1730                                  " where you want the new master to be."
1731                                  " %s is already the master" %
1732                                  self.old_master)
1733
1734   def Exec(self, feedback_fn):
1735     """Failover the master node.
1736
1737     This command, when run on a non-master node, will cause the current
1738     master to cease being master, and the non-master to become new
1739     master.
1740
1741     """
1742     #TODO: do not rely on gethostname returning the FQDN
1743     logger.Info("setting master to %s, old master: %s" %
1744                 (self.new_master, self.old_master))
1745
1746     if not rpc.call_node_stop_master(self.old_master):
1747       logger.Error("could disable the master role on the old master"
1748                    " %s, please disable manually" % self.old_master)
1749
1750     ss = self.sstore
1751     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1752     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1753                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1754       logger.Error("could not distribute the new simple store master file"
1755                    " to the other nodes, please check.")
1756
1757     if not rpc.call_node_start_master(self.new_master):
1758       logger.Error("could not start the master role on the new master"
1759                    " %s, please check" % self.new_master)
1760       feedback_fn("Error in activating the master IP on the new master,"
1761                   " please fix manually.")
1762
1763
1764
1765 class LUQueryClusterInfo(NoHooksLU):
1766   """Query cluster configuration.
1767
1768   """
1769   _OP_REQP = []
1770   REQ_MASTER = False
1771
1772   def CheckPrereq(self):
1773     """No prerequsites needed for this LU.
1774
1775     """
1776     pass
1777
1778   def Exec(self, feedback_fn):
1779     """Return cluster config.
1780
1781     """
1782     result = {
1783       "name": self.sstore.GetClusterName(),
1784       "software_version": constants.RELEASE_VERSION,
1785       "protocol_version": constants.PROTOCOL_VERSION,
1786       "config_version": constants.CONFIG_VERSION,
1787       "os_api_version": constants.OS_API_VERSION,
1788       "export_version": constants.EXPORT_VERSION,
1789       "master": self.sstore.GetMasterNode(),
1790       "architecture": (platform.architecture()[0], platform.machine()),
1791       }
1792
1793     return result
1794
1795
1796 class LUClusterCopyFile(NoHooksLU):
1797   """Copy file to cluster.
1798
1799   """
1800   _OP_REQP = ["nodes", "filename"]
1801
1802   def CheckPrereq(self):
1803     """Check prerequisites.
1804
1805     It should check that the named file exists and that the given list
1806     of nodes is valid.
1807
1808     """
1809     if not os.path.exists(self.op.filename):
1810       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1811
1812     self.nodes = _GetWantedNodes(self, self.op.nodes)
1813
1814   def Exec(self, feedback_fn):
1815     """Copy a file from master to some nodes.
1816
1817     Args:
1818       opts - class with options as members
1819       args - list containing a single element, the file name
1820     Opts used:
1821       nodes - list containing the name of target nodes; if empty, all nodes
1822
1823     """
1824     filename = self.op.filename
1825
1826     myname = utils.HostInfo().name
1827
1828     for node in self.nodes:
1829       if node == myname:
1830         continue
1831       if not ssh.CopyFileToNode(node, filename):
1832         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1833
1834
1835 class LUDumpClusterConfig(NoHooksLU):
1836   """Return a text-representation of the cluster-config.
1837
1838   """
1839   _OP_REQP = []
1840
1841   def CheckPrereq(self):
1842     """No prerequisites.
1843
1844     """
1845     pass
1846
1847   def Exec(self, feedback_fn):
1848     """Dump a representation of the cluster config to the standard output.
1849
1850     """
1851     return self.cfg.DumpConfig()
1852
1853
1854 class LURunClusterCommand(NoHooksLU):
1855   """Run a command on some nodes.
1856
1857   """
1858   _OP_REQP = ["command", "nodes"]
1859
1860   def CheckPrereq(self):
1861     """Check prerequisites.
1862
1863     It checks that the given list of nodes is valid.
1864
1865     """
1866     self.nodes = _GetWantedNodes(self, self.op.nodes)
1867
1868   def Exec(self, feedback_fn):
1869     """Run a command on some nodes.
1870
1871     """
1872     # put the master at the end of the nodes list
1873     master_node = self.sstore.GetMasterNode()
1874     if master_node in self.nodes:
1875       self.nodes.remove(master_node)
1876       self.nodes.append(master_node)
1877
1878     data = []
1879     for node in self.nodes:
1880       result = ssh.SSHCall(node, "root", self.op.command)
1881       data.append((node, result.output, result.exit_code))
1882
1883     return data
1884
1885
1886 class LUActivateInstanceDisks(NoHooksLU):
1887   """Bring up an instance's disks.
1888
1889   """
1890   _OP_REQP = ["instance_name"]
1891
1892   def CheckPrereq(self):
1893     """Check prerequisites.
1894
1895     This checks that the instance is in the cluster.
1896
1897     """
1898     instance = self.cfg.GetInstanceInfo(
1899       self.cfg.ExpandInstanceName(self.op.instance_name))
1900     if instance is None:
1901       raise errors.OpPrereqError("Instance '%s' not known" %
1902                                  self.op.instance_name)
1903     self.instance = instance
1904
1905
1906   def Exec(self, feedback_fn):
1907     """Activate the disks.
1908
1909     """
1910     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1911     if not disks_ok:
1912       raise errors.OpExecError("Cannot activate block devices")
1913
1914     return disks_info
1915
1916
1917 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1918   """Prepare the block devices for an instance.
1919
1920   This sets up the block devices on all nodes.
1921
1922   Args:
1923     instance: a ganeti.objects.Instance object
1924     ignore_secondaries: if true, errors on secondary nodes won't result
1925                         in an error return from the function
1926
1927   Returns:
1928     false if the operation failed
1929     list of (host, instance_visible_name, node_visible_name) if the operation
1930          suceeded with the mapping from node devices to instance devices
1931   """
1932   device_info = []
1933   disks_ok = True
1934   iname = instance.name
1935   # With the two passes mechanism we try to reduce the window of
1936   # opportunity for the race condition of switching DRBD to primary
1937   # before handshaking occured, but we do not eliminate it
1938
1939   # The proper fix would be to wait (with some limits) until the
1940   # connection has been made and drbd transitions from WFConnection
1941   # into any other network-connected state (Connected, SyncTarget,
1942   # SyncSource, etc.)
1943
1944   # 1st pass, assemble on all nodes in secondary mode
1945   for inst_disk in instance.disks:
1946     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1947       cfg.SetDiskID(node_disk, node)
1948       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1949       if not result:
1950         logger.Error("could not prepare block device %s on node %s"
1951                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1952         if not ignore_secondaries:
1953           disks_ok = False
1954
1955   # FIXME: race condition on drbd migration to primary
1956
1957   # 2nd pass, do only the primary node
1958   for inst_disk in instance.disks:
1959     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1960       if node != instance.primary_node:
1961         continue
1962       cfg.SetDiskID(node_disk, node)
1963       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1964       if not result:
1965         logger.Error("could not prepare block device %s on node %s"
1966                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1967         disks_ok = False
1968     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1969
1970   # leave the disks configured for the primary node
1971   # this is a workaround that would be fixed better by
1972   # improving the logical/physical id handling
1973   for disk in instance.disks:
1974     cfg.SetDiskID(disk, instance.primary_node)
1975
1976   return disks_ok, device_info
1977
1978
1979 def _StartInstanceDisks(cfg, instance, force):
1980   """Start the disks of an instance.
1981
1982   """
1983   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1984                                            ignore_secondaries=force)
1985   if not disks_ok:
1986     _ShutdownInstanceDisks(instance, cfg)
1987     if force is not None and not force:
1988       logger.Error("If the message above refers to a secondary node,"
1989                    " you can retry the operation using '--force'.")
1990     raise errors.OpExecError("Disk consistency error")
1991
1992
1993 class LUDeactivateInstanceDisks(NoHooksLU):
1994   """Shutdown an instance's disks.
1995
1996   """
1997   _OP_REQP = ["instance_name"]
1998
1999   def CheckPrereq(self):
2000     """Check prerequisites.
2001
2002     This checks that the instance is in the cluster.
2003
2004     """
2005     instance = self.cfg.GetInstanceInfo(
2006       self.cfg.ExpandInstanceName(self.op.instance_name))
2007     if instance is None:
2008       raise errors.OpPrereqError("Instance '%s' not known" %
2009                                  self.op.instance_name)
2010     self.instance = instance
2011
2012   def Exec(self, feedback_fn):
2013     """Deactivate the disks
2014
2015     """
2016     instance = self.instance
2017     ins_l = rpc.call_instance_list([instance.primary_node])
2018     ins_l = ins_l[instance.primary_node]
2019     if not type(ins_l) is list:
2020       raise errors.OpExecError("Can't contact node '%s'" %
2021                                instance.primary_node)
2022
2023     if self.instance.name in ins_l:
2024       raise errors.OpExecError("Instance is running, can't shutdown"
2025                                " block devices.")
2026
2027     _ShutdownInstanceDisks(instance, self.cfg)
2028
2029
2030 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2031   """Shutdown block devices of an instance.
2032
2033   This does the shutdown on all nodes of the instance.
2034
2035   If the ignore_primary is false, errors on the primary node are
2036   ignored.
2037
2038   """
2039   result = True
2040   for disk in instance.disks:
2041     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2042       cfg.SetDiskID(top_disk, node)
2043       if not rpc.call_blockdev_shutdown(node, top_disk):
2044         logger.Error("could not shutdown block device %s on node %s" %
2045                      (disk.iv_name, node))
2046         if not ignore_primary or node != instance.primary_node:
2047           result = False
2048   return result
2049
2050
2051 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2052   """Checks if a node has enough free memory.
2053
2054   This function check if a given node has the needed amount of free
2055   memory. In case the node has less memory or we cannot get the
2056   information from the node, this function raise an OpPrereqError
2057   exception.
2058
2059   Args:
2060     - cfg: a ConfigWriter instance
2061     - node: the node name
2062     - reason: string to use in the error message
2063     - requested: the amount of memory in MiB
2064
2065   """
2066   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2067   if not nodeinfo or not isinstance(nodeinfo, dict):
2068     raise errors.OpPrereqError("Could not contact node %s for resource"
2069                              " information" % (node,))
2070
2071   free_mem = nodeinfo[node].get('memory_free')
2072   if not isinstance(free_mem, int):
2073     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2074                              " was '%s'" % (node, free_mem))
2075   if requested > free_mem:
2076     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2077                              " needed %s MiB, available %s MiB" %
2078                              (node, reason, requested, free_mem))
2079
2080
2081 class LUStartupInstance(LogicalUnit):
2082   """Starts an instance.
2083
2084   """
2085   HPATH = "instance-start"
2086   HTYPE = constants.HTYPE_INSTANCE
2087   _OP_REQP = ["instance_name", "force"]
2088
2089   def BuildHooksEnv(self):
2090     """Build hooks env.
2091
2092     This runs on master, primary and secondary nodes of the instance.
2093
2094     """
2095     env = {
2096       "FORCE": self.op.force,
2097       }
2098     env.update(_BuildInstanceHookEnvByObject(self.instance))
2099     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2100           list(self.instance.secondary_nodes))
2101     return env, nl, nl
2102
2103   def CheckPrereq(self):
2104     """Check prerequisites.
2105
2106     This checks that the instance is in the cluster.
2107
2108     """
2109     instance = self.cfg.GetInstanceInfo(
2110       self.cfg.ExpandInstanceName(self.op.instance_name))
2111     if instance is None:
2112       raise errors.OpPrereqError("Instance '%s' not known" %
2113                                  self.op.instance_name)
2114
2115     # check bridges existance
2116     _CheckInstanceBridgesExist(instance)
2117
2118     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2119                          "starting instance %s" % instance.name,
2120                          instance.memory)
2121
2122     self.instance = instance
2123     self.op.instance_name = instance.name
2124
2125   def Exec(self, feedback_fn):
2126     """Start the instance.
2127
2128     """
2129     instance = self.instance
2130     force = self.op.force
2131     extra_args = getattr(self.op, "extra_args", "")
2132
2133     self.cfg.MarkInstanceUp(instance.name)
2134
2135     node_current = instance.primary_node
2136
2137     _StartInstanceDisks(self.cfg, instance, force)
2138
2139     if not rpc.call_instance_start(node_current, instance, extra_args):
2140       _ShutdownInstanceDisks(instance, self.cfg)
2141       raise errors.OpExecError("Could not start instance")
2142
2143
2144 class LURebootInstance(LogicalUnit):
2145   """Reboot an instance.
2146
2147   """
2148   HPATH = "instance-reboot"
2149   HTYPE = constants.HTYPE_INSTANCE
2150   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2151
2152   def BuildHooksEnv(self):
2153     """Build hooks env.
2154
2155     This runs on master, primary and secondary nodes of the instance.
2156
2157     """
2158     env = {
2159       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2160       }
2161     env.update(_BuildInstanceHookEnvByObject(self.instance))
2162     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2163           list(self.instance.secondary_nodes))
2164     return env, nl, nl
2165
2166   def CheckPrereq(self):
2167     """Check prerequisites.
2168
2169     This checks that the instance is in the cluster.
2170
2171     """
2172     instance = self.cfg.GetInstanceInfo(
2173       self.cfg.ExpandInstanceName(self.op.instance_name))
2174     if instance is None:
2175       raise errors.OpPrereqError("Instance '%s' not known" %
2176                                  self.op.instance_name)
2177
2178     # check bridges existance
2179     _CheckInstanceBridgesExist(instance)
2180
2181     self.instance = instance
2182     self.op.instance_name = instance.name
2183
2184   def Exec(self, feedback_fn):
2185     """Reboot the instance.
2186
2187     """
2188     instance = self.instance
2189     ignore_secondaries = self.op.ignore_secondaries
2190     reboot_type = self.op.reboot_type
2191     extra_args = getattr(self.op, "extra_args", "")
2192
2193     node_current = instance.primary_node
2194
2195     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2196                            constants.INSTANCE_REBOOT_HARD,
2197                            constants.INSTANCE_REBOOT_FULL]:
2198       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2199                                   (constants.INSTANCE_REBOOT_SOFT,
2200                                    constants.INSTANCE_REBOOT_HARD,
2201                                    constants.INSTANCE_REBOOT_FULL))
2202
2203     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2204                        constants.INSTANCE_REBOOT_HARD]:
2205       if not rpc.call_instance_reboot(node_current, instance,
2206                                       reboot_type, extra_args):
2207         raise errors.OpExecError("Could not reboot instance")
2208     else:
2209       if not rpc.call_instance_shutdown(node_current, instance):
2210         raise errors.OpExecError("could not shutdown instance for full reboot")
2211       _ShutdownInstanceDisks(instance, self.cfg)
2212       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2213       if not rpc.call_instance_start(node_current, instance, extra_args):
2214         _ShutdownInstanceDisks(instance, self.cfg)
2215         raise errors.OpExecError("Could not start instance for full reboot")
2216
2217     self.cfg.MarkInstanceUp(instance.name)
2218
2219
2220 class LUShutdownInstance(LogicalUnit):
2221   """Shutdown an instance.
2222
2223   """
2224   HPATH = "instance-stop"
2225   HTYPE = constants.HTYPE_INSTANCE
2226   _OP_REQP = ["instance_name"]
2227
2228   def BuildHooksEnv(self):
2229     """Build hooks env.
2230
2231     This runs on master, primary and secondary nodes of the instance.
2232
2233     """
2234     env = _BuildInstanceHookEnvByObject(self.instance)
2235     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2236           list(self.instance.secondary_nodes))
2237     return env, nl, nl
2238
2239   def CheckPrereq(self):
2240     """Check prerequisites.
2241
2242     This checks that the instance is in the cluster.
2243
2244     """
2245     instance = self.cfg.GetInstanceInfo(
2246       self.cfg.ExpandInstanceName(self.op.instance_name))
2247     if instance is None:
2248       raise errors.OpPrereqError("Instance '%s' not known" %
2249                                  self.op.instance_name)
2250     self.instance = instance
2251
2252   def Exec(self, feedback_fn):
2253     """Shutdown the instance.
2254
2255     """
2256     instance = self.instance
2257     node_current = instance.primary_node
2258     self.cfg.MarkInstanceDown(instance.name)
2259     if not rpc.call_instance_shutdown(node_current, instance):
2260       logger.Error("could not shutdown instance")
2261
2262     _ShutdownInstanceDisks(instance, self.cfg)
2263
2264
2265 class LUReinstallInstance(LogicalUnit):
2266   """Reinstall an instance.
2267
2268   """
2269   HPATH = "instance-reinstall"
2270   HTYPE = constants.HTYPE_INSTANCE
2271   _OP_REQP = ["instance_name"]
2272
2273   def BuildHooksEnv(self):
2274     """Build hooks env.
2275
2276     This runs on master, primary and secondary nodes of the instance.
2277
2278     """
2279     env = _BuildInstanceHookEnvByObject(self.instance)
2280     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2281           list(self.instance.secondary_nodes))
2282     return env, nl, nl
2283
2284   def CheckPrereq(self):
2285     """Check prerequisites.
2286
2287     This checks that the instance is in the cluster and is not running.
2288
2289     """
2290     instance = self.cfg.GetInstanceInfo(
2291       self.cfg.ExpandInstanceName(self.op.instance_name))
2292     if instance is None:
2293       raise errors.OpPrereqError("Instance '%s' not known" %
2294                                  self.op.instance_name)
2295     if instance.disk_template == constants.DT_DISKLESS:
2296       raise errors.OpPrereqError("Instance '%s' has no disks" %
2297                                  self.op.instance_name)
2298     if instance.status != "down":
2299       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2300                                  self.op.instance_name)
2301     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2302     if remote_info:
2303       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2304                                  (self.op.instance_name,
2305                                   instance.primary_node))
2306
2307     self.op.os_type = getattr(self.op, "os_type", None)
2308     if self.op.os_type is not None:
2309       # OS verification
2310       pnode = self.cfg.GetNodeInfo(
2311         self.cfg.ExpandNodeName(instance.primary_node))
2312       if pnode is None:
2313         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2314                                    self.op.pnode)
2315       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2316       if not os_obj:
2317         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2318                                    " primary node"  % self.op.os_type)
2319
2320     self.instance = instance
2321
2322   def Exec(self, feedback_fn):
2323     """Reinstall the instance.
2324
2325     """
2326     inst = self.instance
2327
2328     if self.op.os_type is not None:
2329       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2330       inst.os = self.op.os_type
2331       self.cfg.AddInstance(inst)
2332
2333     _StartInstanceDisks(self.cfg, inst, None)
2334     try:
2335       feedback_fn("Running the instance OS create scripts...")
2336       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2337         raise errors.OpExecError("Could not install OS for instance %s"
2338                                  " on node %s" %
2339                                  (inst.name, inst.primary_node))
2340     finally:
2341       _ShutdownInstanceDisks(inst, self.cfg)
2342
2343
2344 class LURenameInstance(LogicalUnit):
2345   """Rename an instance.
2346
2347   """
2348   HPATH = "instance-rename"
2349   HTYPE = constants.HTYPE_INSTANCE
2350   _OP_REQP = ["instance_name", "new_name"]
2351
2352   def BuildHooksEnv(self):
2353     """Build hooks env.
2354
2355     This runs on master, primary and secondary nodes of the instance.
2356
2357     """
2358     env = _BuildInstanceHookEnvByObject(self.instance)
2359     env["INSTANCE_NEW_NAME"] = self.op.new_name
2360     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2361           list(self.instance.secondary_nodes))
2362     return env, nl, nl
2363
2364   def CheckPrereq(self):
2365     """Check prerequisites.
2366
2367     This checks that the instance is in the cluster and is not running.
2368
2369     """
2370     instance = self.cfg.GetInstanceInfo(
2371       self.cfg.ExpandInstanceName(self.op.instance_name))
2372     if instance is None:
2373       raise errors.OpPrereqError("Instance '%s' not known" %
2374                                  self.op.instance_name)
2375     if instance.status != "down":
2376       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2377                                  self.op.instance_name)
2378     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2379     if remote_info:
2380       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2381                                  (self.op.instance_name,
2382                                   instance.primary_node))
2383     self.instance = instance
2384
2385     # new name verification
2386     name_info = utils.HostInfo(self.op.new_name)
2387
2388     self.op.new_name = new_name = name_info.name
2389     instance_list = self.cfg.GetInstanceList()
2390     if new_name in instance_list:
2391       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2392                                  instance_name)
2393
2394     if not getattr(self.op, "ignore_ip", False):
2395       command = ["fping", "-q", name_info.ip]
2396       result = utils.RunCmd(command)
2397       if not result.failed:
2398         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2399                                    (name_info.ip, new_name))
2400
2401
2402   def Exec(self, feedback_fn):
2403     """Reinstall the instance.
2404
2405     """
2406     inst = self.instance
2407     old_name = inst.name
2408
2409     self.cfg.RenameInstance(inst.name, self.op.new_name)
2410
2411     # re-read the instance from the configuration after rename
2412     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2413
2414     _StartInstanceDisks(self.cfg, inst, None)
2415     try:
2416       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2417                                           "sda", "sdb"):
2418         msg = ("Could run OS rename script for instance %s on node %s (but the"
2419                " instance has been renamed in Ganeti)" %
2420                (inst.name, inst.primary_node))
2421         logger.Error(msg)
2422     finally:
2423       _ShutdownInstanceDisks(inst, self.cfg)
2424
2425
2426 class LURemoveInstance(LogicalUnit):
2427   """Remove an instance.
2428
2429   """
2430   HPATH = "instance-remove"
2431   HTYPE = constants.HTYPE_INSTANCE
2432   _OP_REQP = ["instance_name"]
2433
2434   def BuildHooksEnv(self):
2435     """Build hooks env.
2436
2437     This runs on master, primary and secondary nodes of the instance.
2438
2439     """
2440     env = _BuildInstanceHookEnvByObject(self.instance)
2441     nl = [self.sstore.GetMasterNode()]
2442     return env, nl, nl
2443
2444   def CheckPrereq(self):
2445     """Check prerequisites.
2446
2447     This checks that the instance is in the cluster.
2448
2449     """
2450     instance = self.cfg.GetInstanceInfo(
2451       self.cfg.ExpandInstanceName(self.op.instance_name))
2452     if instance is None:
2453       raise errors.OpPrereqError("Instance '%s' not known" %
2454                                  self.op.instance_name)
2455     self.instance = instance
2456
2457   def Exec(self, feedback_fn):
2458     """Remove the instance.
2459
2460     """
2461     instance = self.instance
2462     logger.Info("shutting down instance %s on node %s" %
2463                 (instance.name, instance.primary_node))
2464
2465     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2466       if self.op.ignore_failures:
2467         feedback_fn("Warning: can't shutdown instance")
2468       else:
2469         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2470                                  (instance.name, instance.primary_node))
2471
2472     logger.Info("removing block devices for instance %s" % instance.name)
2473
2474     if not _RemoveDisks(instance, self.cfg):
2475       if self.op.ignore_failures:
2476         feedback_fn("Warning: can't remove instance's disks")
2477       else:
2478         raise errors.OpExecError("Can't remove instance's disks")
2479
2480     logger.Info("removing instance %s out of cluster config" % instance.name)
2481
2482     self.cfg.RemoveInstance(instance.name)
2483
2484
2485 class LUQueryInstances(NoHooksLU):
2486   """Logical unit for querying instances.
2487
2488   """
2489   _OP_REQP = ["output_fields", "names"]
2490
2491   def CheckPrereq(self):
2492     """Check prerequisites.
2493
2494     This checks that the fields required are valid output fields.
2495
2496     """
2497     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2498     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2499                                "admin_state", "admin_ram",
2500                                "disk_template", "ip", "mac", "bridge",
2501                                "sda_size", "sdb_size", "vcpus"],
2502                        dynamic=self.dynamic_fields,
2503                        selected=self.op.output_fields)
2504
2505     self.wanted = _GetWantedInstances(self, self.op.names)
2506
2507   def Exec(self, feedback_fn):
2508     """Computes the list of nodes and their attributes.
2509
2510     """
2511     instance_names = self.wanted
2512     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2513                      in instance_names]
2514
2515     # begin data gathering
2516
2517     nodes = frozenset([inst.primary_node for inst in instance_list])
2518
2519     bad_nodes = []
2520     if self.dynamic_fields.intersection(self.op.output_fields):
2521       live_data = {}
2522       node_data = rpc.call_all_instances_info(nodes)
2523       for name in nodes:
2524         result = node_data[name]
2525         if result:
2526           live_data.update(result)
2527         elif result == False:
2528           bad_nodes.append(name)
2529         # else no instance is alive
2530     else:
2531       live_data = dict([(name, {}) for name in instance_names])
2532
2533     # end data gathering
2534
2535     output = []
2536     for instance in instance_list:
2537       iout = []
2538       for field in self.op.output_fields:
2539         if field == "name":
2540           val = instance.name
2541         elif field == "os":
2542           val = instance.os
2543         elif field == "pnode":
2544           val = instance.primary_node
2545         elif field == "snodes":
2546           val = list(instance.secondary_nodes)
2547         elif field == "admin_state":
2548           val = (instance.status != "down")
2549         elif field == "oper_state":
2550           if instance.primary_node in bad_nodes:
2551             val = None
2552           else:
2553             val = bool(live_data.get(instance.name))
2554         elif field == "status":
2555           if instance.primary_node in bad_nodes:
2556             val = "ERROR_nodedown"
2557           else:
2558             running = bool(live_data.get(instance.name))
2559             if running:
2560               if instance.status != "down":
2561                 val = "running"
2562               else:
2563                 val = "ERROR_up"
2564             else:
2565               if instance.status != "down":
2566                 val = "ERROR_down"
2567               else:
2568                 val = "ADMIN_down"
2569         elif field == "admin_ram":
2570           val = instance.memory
2571         elif field == "oper_ram":
2572           if instance.primary_node in bad_nodes:
2573             val = None
2574           elif instance.name in live_data:
2575             val = live_data[instance.name].get("memory", "?")
2576           else:
2577             val = "-"
2578         elif field == "disk_template":
2579           val = instance.disk_template
2580         elif field == "ip":
2581           val = instance.nics[0].ip
2582         elif field == "bridge":
2583           val = instance.nics[0].bridge
2584         elif field == "mac":
2585           val = instance.nics[0].mac
2586         elif field == "sda_size" or field == "sdb_size":
2587           disk = instance.FindDisk(field[:3])
2588           if disk is None:
2589             val = None
2590           else:
2591             val = disk.size
2592         elif field == "vcpus":
2593           val = instance.vcpus
2594         else:
2595           raise errors.ParameterError(field)
2596         iout.append(val)
2597       output.append(iout)
2598
2599     return output
2600
2601
2602 class LUFailoverInstance(LogicalUnit):
2603   """Failover an instance.
2604
2605   """
2606   HPATH = "instance-failover"
2607   HTYPE = constants.HTYPE_INSTANCE
2608   _OP_REQP = ["instance_name", "ignore_consistency"]
2609
2610   def BuildHooksEnv(self):
2611     """Build hooks env.
2612
2613     This runs on master, primary and secondary nodes of the instance.
2614
2615     """
2616     env = {
2617       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2618       }
2619     env.update(_BuildInstanceHookEnvByObject(self.instance))
2620     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2621     return env, nl, nl
2622
2623   def CheckPrereq(self):
2624     """Check prerequisites.
2625
2626     This checks that the instance is in the cluster.
2627
2628     """
2629     instance = self.cfg.GetInstanceInfo(
2630       self.cfg.ExpandInstanceName(self.op.instance_name))
2631     if instance is None:
2632       raise errors.OpPrereqError("Instance '%s' not known" %
2633                                  self.op.instance_name)
2634
2635     if instance.disk_template not in constants.DTS_NET_MIRROR:
2636       raise errors.OpPrereqError("Instance's disk layout is not"
2637                                  " network mirrored, cannot failover.")
2638
2639     secondary_nodes = instance.secondary_nodes
2640     if not secondary_nodes:
2641       raise errors.ProgrammerError("no secondary node but using "
2642                                    "DT_REMOTE_RAID1 template")
2643
2644     target_node = secondary_nodes[0]
2645     # check memory requirements on the secondary node
2646     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2647                          instance.name, instance.memory)
2648
2649     # check bridge existance
2650     brlist = [nic.bridge for nic in instance.nics]
2651     if not rpc.call_bridges_exist(target_node, brlist):
2652       raise errors.OpPrereqError("One or more target bridges %s does not"
2653                                  " exist on destination node '%s'" %
2654                                  (brlist, target_node))
2655
2656     self.instance = instance
2657
2658   def Exec(self, feedback_fn):
2659     """Failover an instance.
2660
2661     The failover is done by shutting it down on its present node and
2662     starting it on the secondary.
2663
2664     """
2665     instance = self.instance
2666
2667     source_node = instance.primary_node
2668     target_node = instance.secondary_nodes[0]
2669
2670     feedback_fn("* checking disk consistency between source and target")
2671     for dev in instance.disks:
2672       # for remote_raid1, these are md over drbd
2673       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2674         if instance.status == "up" and not self.op.ignore_consistency:
2675           raise errors.OpExecError("Disk %s is degraded on target node,"
2676                                    " aborting failover." % dev.iv_name)
2677
2678     feedback_fn("* shutting down instance on source node")
2679     logger.Info("Shutting down instance %s on node %s" %
2680                 (instance.name, source_node))
2681
2682     if not rpc.call_instance_shutdown(source_node, instance):
2683       if self.op.ignore_consistency:
2684         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2685                      " anyway. Please make sure node %s is down"  %
2686                      (instance.name, source_node, source_node))
2687       else:
2688         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2689                                  (instance.name, source_node))
2690
2691     feedback_fn("* deactivating the instance's disks on source node")
2692     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2693       raise errors.OpExecError("Can't shut down the instance's disks.")
2694
2695     instance.primary_node = target_node
2696     # distribute new instance config to the other nodes
2697     self.cfg.AddInstance(instance)
2698
2699     # Only start the instance if it's marked as up
2700     if instance.status == "up":
2701       feedback_fn("* activating the instance's disks on target node")
2702       logger.Info("Starting instance %s on node %s" %
2703                   (instance.name, target_node))
2704
2705       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2706                                                ignore_secondaries=True)
2707       if not disks_ok:
2708         _ShutdownInstanceDisks(instance, self.cfg)
2709         raise errors.OpExecError("Can't activate the instance's disks")
2710
2711       feedback_fn("* starting the instance on the target node")
2712       if not rpc.call_instance_start(target_node, instance, None):
2713         _ShutdownInstanceDisks(instance, self.cfg)
2714         raise errors.OpExecError("Could not start instance %s on node %s." %
2715                                  (instance.name, target_node))
2716
2717
2718 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2719   """Create a tree of block devices on the primary node.
2720
2721   This always creates all devices.
2722
2723   """
2724   if device.children:
2725     for child in device.children:
2726       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2727         return False
2728
2729   cfg.SetDiskID(device, node)
2730   new_id = rpc.call_blockdev_create(node, device, device.size,
2731                                     instance.name, True, info)
2732   if not new_id:
2733     return False
2734   if device.physical_id is None:
2735     device.physical_id = new_id
2736   return True
2737
2738
2739 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2740   """Create a tree of block devices on a secondary node.
2741
2742   If this device type has to be created on secondaries, create it and
2743   all its children.
2744
2745   If not, just recurse to children keeping the same 'force' value.
2746
2747   """
2748   if device.CreateOnSecondary():
2749     force = True
2750   if device.children:
2751     for child in device.children:
2752       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2753                                         child, force, info):
2754         return False
2755
2756   if not force:
2757     return True
2758   cfg.SetDiskID(device, node)
2759   new_id = rpc.call_blockdev_create(node, device, device.size,
2760                                     instance.name, False, info)
2761   if not new_id:
2762     return False
2763   if device.physical_id is None:
2764     device.physical_id = new_id
2765   return True
2766
2767
2768 def _GenerateUniqueNames(cfg, exts):
2769   """Generate a suitable LV name.
2770
2771   This will generate a logical volume name for the given instance.
2772
2773   """
2774   results = []
2775   for val in exts:
2776     new_id = cfg.GenerateUniqueID()
2777     results.append("%s%s" % (new_id, val))
2778   return results
2779
2780
2781 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2782   """Generate a drbd device complete with its children.
2783
2784   """
2785   port = cfg.AllocatePort()
2786   vgname = cfg.GetVGName()
2787   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2788                           logical_id=(vgname, names[0]))
2789   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2790                           logical_id=(vgname, names[1]))
2791   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2792                           logical_id = (primary, secondary, port),
2793                           children = [dev_data, dev_meta])
2794   return drbd_dev
2795
2796
2797 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2798   """Generate a drbd8 device complete with its children.
2799
2800   """
2801   port = cfg.AllocatePort()
2802   vgname = cfg.GetVGName()
2803   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2804                           logical_id=(vgname, names[0]))
2805   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2806                           logical_id=(vgname, names[1]))
2807   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2808                           logical_id = (primary, secondary, port),
2809                           children = [dev_data, dev_meta],
2810                           iv_name=iv_name)
2811   return drbd_dev
2812
2813 def _GenerateDiskTemplate(cfg, template_name,
2814                           instance_name, primary_node,
2815                           secondary_nodes, disk_sz, swap_sz):
2816   """Generate the entire disk layout for a given template type.
2817
2818   """
2819   #TODO: compute space requirements
2820
2821   vgname = cfg.GetVGName()
2822   if template_name == constants.DT_DISKLESS:
2823     disks = []
2824   elif template_name == constants.DT_PLAIN:
2825     if len(secondary_nodes) != 0:
2826       raise errors.ProgrammerError("Wrong template configuration")
2827
2828     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2829     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2830                            logical_id=(vgname, names[0]),
2831                            iv_name = "sda")
2832     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2833                            logical_id=(vgname, names[1]),
2834                            iv_name = "sdb")
2835     disks = [sda_dev, sdb_dev]
2836   elif template_name == constants.DT_LOCAL_RAID1:
2837     if len(secondary_nodes) != 0:
2838       raise errors.ProgrammerError("Wrong template configuration")
2839
2840
2841     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2842                                        ".sdb_m1", ".sdb_m2"])
2843     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2844                               logical_id=(vgname, names[0]))
2845     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2846                               logical_id=(vgname, names[1]))
2847     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2848                               size=disk_sz,
2849                               children = [sda_dev_m1, sda_dev_m2])
2850     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2851                               logical_id=(vgname, names[2]))
2852     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2853                               logical_id=(vgname, names[3]))
2854     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2855                               size=swap_sz,
2856                               children = [sdb_dev_m1, sdb_dev_m2])
2857     disks = [md_sda_dev, md_sdb_dev]
2858   elif template_name == constants.DT_REMOTE_RAID1:
2859     if len(secondary_nodes) != 1:
2860       raise errors.ProgrammerError("Wrong template configuration")
2861     remote_node = secondary_nodes[0]
2862     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2863                                        ".sdb_data", ".sdb_meta"])
2864     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2865                                          disk_sz, names[0:2])
2866     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2867                               children = [drbd_sda_dev], size=disk_sz)
2868     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2869                                          swap_sz, names[2:4])
2870     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2871                               children = [drbd_sdb_dev], size=swap_sz)
2872     disks = [md_sda_dev, md_sdb_dev]
2873   elif template_name == constants.DT_DRBD8:
2874     if len(secondary_nodes) != 1:
2875       raise errors.ProgrammerError("Wrong template configuration")
2876     remote_node = secondary_nodes[0]
2877     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2878                                        ".sdb_data", ".sdb_meta"])
2879     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2880                                          disk_sz, names[0:2], "sda")
2881     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2882                                          swap_sz, names[2:4], "sdb")
2883     disks = [drbd_sda_dev, drbd_sdb_dev]
2884   else:
2885     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2886   return disks
2887
2888
2889 def _GetInstanceInfoText(instance):
2890   """Compute that text that should be added to the disk's metadata.
2891
2892   """
2893   return "originstname+%s" % instance.name
2894
2895
2896 def _CreateDisks(cfg, instance):
2897   """Create all disks for an instance.
2898
2899   This abstracts away some work from AddInstance.
2900
2901   Args:
2902     instance: the instance object
2903
2904   Returns:
2905     True or False showing the success of the creation process
2906
2907   """
2908   info = _GetInstanceInfoText(instance)
2909
2910   for device in instance.disks:
2911     logger.Info("creating volume %s for instance %s" %
2912               (device.iv_name, instance.name))
2913     #HARDCODE
2914     for secondary_node in instance.secondary_nodes:
2915       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2916                                         device, False, info):
2917         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2918                      (device.iv_name, device, secondary_node))
2919         return False
2920     #HARDCODE
2921     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2922                                     instance, device, info):
2923       logger.Error("failed to create volume %s on primary!" %
2924                    device.iv_name)
2925       return False
2926   return True
2927
2928
2929 def _RemoveDisks(instance, cfg):
2930   """Remove all disks for an instance.
2931
2932   This abstracts away some work from `AddInstance()` and
2933   `RemoveInstance()`. Note that in case some of the devices couldn't
2934   be removed, the removal will continue with the other ones (compare
2935   with `_CreateDisks()`).
2936
2937   Args:
2938     instance: the instance object
2939
2940   Returns:
2941     True or False showing the success of the removal proces
2942
2943   """
2944   logger.Info("removing block devices for instance %s" % instance.name)
2945
2946   result = True
2947   for device in instance.disks:
2948     for node, disk in device.ComputeNodeTree(instance.primary_node):
2949       cfg.SetDiskID(disk, node)
2950       if not rpc.call_blockdev_remove(node, disk):
2951         logger.Error("could not remove block device %s on node %s,"
2952                      " continuing anyway" %
2953                      (device.iv_name, node))
2954         result = False
2955   return result
2956
2957
2958 class LUCreateInstance(LogicalUnit):
2959   """Create an instance.
2960
2961   """
2962   HPATH = "instance-add"
2963   HTYPE = constants.HTYPE_INSTANCE
2964   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2965               "disk_template", "swap_size", "mode", "start", "vcpus",
2966               "wait_for_sync", "ip_check", "mac"]
2967
2968   def BuildHooksEnv(self):
2969     """Build hooks env.
2970
2971     This runs on master, primary and secondary nodes of the instance.
2972
2973     """
2974     env = {
2975       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2976       "INSTANCE_DISK_SIZE": self.op.disk_size,
2977       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2978       "INSTANCE_ADD_MODE": self.op.mode,
2979       }
2980     if self.op.mode == constants.INSTANCE_IMPORT:
2981       env["INSTANCE_SRC_NODE"] = self.op.src_node
2982       env["INSTANCE_SRC_PATH"] = self.op.src_path
2983       env["INSTANCE_SRC_IMAGE"] = self.src_image
2984
2985     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2986       primary_node=self.op.pnode,
2987       secondary_nodes=self.secondaries,
2988       status=self.instance_status,
2989       os_type=self.op.os_type,
2990       memory=self.op.mem_size,
2991       vcpus=self.op.vcpus,
2992       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2993     ))
2994
2995     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2996           self.secondaries)
2997     return env, nl, nl
2998
2999
3000   def CheckPrereq(self):
3001     """Check prerequisites.
3002
3003     """
3004     for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
3005       if not hasattr(self.op, attr):
3006         setattr(self.op, attr, None)
3007
3008     if self.op.mode not in (constants.INSTANCE_CREATE,
3009                             constants.INSTANCE_IMPORT):
3010       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3011                                  self.op.mode)
3012
3013     if self.op.mode == constants.INSTANCE_IMPORT:
3014       src_node = getattr(self.op, "src_node", None)
3015       src_path = getattr(self.op, "src_path", None)
3016       if src_node is None or src_path is None:
3017         raise errors.OpPrereqError("Importing an instance requires source"
3018                                    " node and path options")
3019       src_node_full = self.cfg.ExpandNodeName(src_node)
3020       if src_node_full is None:
3021         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3022       self.op.src_node = src_node = src_node_full
3023
3024       if not os.path.isabs(src_path):
3025         raise errors.OpPrereqError("The source path must be absolute")
3026
3027       export_info = rpc.call_export_info(src_node, src_path)
3028
3029       if not export_info:
3030         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3031
3032       if not export_info.has_section(constants.INISECT_EXP):
3033         raise errors.ProgrammerError("Corrupted export config")
3034
3035       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3036       if (int(ei_version) != constants.EXPORT_VERSION):
3037         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3038                                    (ei_version, constants.EXPORT_VERSION))
3039
3040       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3041         raise errors.OpPrereqError("Can't import instance with more than"
3042                                    " one data disk")
3043
3044       # FIXME: are the old os-es, disk sizes, etc. useful?
3045       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3046       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3047                                                          'disk0_dump'))
3048       self.src_image = diskimage
3049     else: # INSTANCE_CREATE
3050       if getattr(self.op, "os_type", None) is None:
3051         raise errors.OpPrereqError("No guest OS specified")
3052
3053     # check primary node
3054     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3055     if pnode is None:
3056       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3057                                  self.op.pnode)
3058     self.op.pnode = pnode.name
3059     self.pnode = pnode
3060     self.secondaries = []
3061     # disk template and mirror node verification
3062     if self.op.disk_template not in constants.DISK_TEMPLATES:
3063       raise errors.OpPrereqError("Invalid disk template name")
3064
3065     if self.op.disk_template in constants.DTS_NET_MIRROR:
3066       if getattr(self.op, "snode", None) is None:
3067         raise errors.OpPrereqError("The networked disk templates need"
3068                                    " a mirror node")
3069
3070       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3071       if snode_name is None:
3072         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3073                                    self.op.snode)
3074       elif snode_name == pnode.name:
3075         raise errors.OpPrereqError("The secondary node cannot be"
3076                                    " the primary node.")
3077       self.secondaries.append(snode_name)
3078
3079     # Required free disk space as a function of disk and swap space
3080     req_size_dict = {
3081       constants.DT_DISKLESS: None,
3082       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3083       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
3084       # 256 MB are added for drbd metadata, 128MB for each drbd device
3085       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
3086       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3087     }
3088
3089     if self.op.disk_template not in req_size_dict:
3090       raise errors.ProgrammerError("Disk template '%s' size requirement"
3091                                    " is unknown" %  self.op.disk_template)
3092
3093     req_size = req_size_dict[self.op.disk_template]
3094
3095     # Check lv size requirements
3096     if req_size is not None:
3097       nodenames = [pnode.name] + self.secondaries
3098       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3099       for node in nodenames:
3100         info = nodeinfo.get(node, None)
3101         if not info:
3102           raise errors.OpPrereqError("Cannot get current information"
3103                                      " from node '%s'" % nodeinfo)
3104         vg_free = info.get('vg_free', None)
3105         if not isinstance(vg_free, int):
3106           raise errors.OpPrereqError("Can't compute free disk space on"
3107                                      " node %s" % node)
3108         if req_size > info['vg_free']:
3109           raise errors.OpPrereqError("Not enough disk space on target node %s."
3110                                      " %d MB available, %d MB required" %
3111                                      (node, info['vg_free'], req_size))
3112
3113     # os verification
3114     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3115     if not os_obj:
3116       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3117                                  " primary node"  % self.op.os_type)
3118
3119     if self.op.kernel_path == constants.VALUE_NONE:
3120       raise errors.OpPrereqError("Can't set instance kernel to none")
3121
3122     # instance verification
3123     hostname1 = utils.HostInfo(self.op.instance_name)
3124
3125     self.op.instance_name = instance_name = hostname1.name
3126     instance_list = self.cfg.GetInstanceList()
3127     if instance_name in instance_list:
3128       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3129                                  instance_name)
3130
3131     ip = getattr(self.op, "ip", None)
3132     if ip is None or ip.lower() == "none":
3133       inst_ip = None
3134     elif ip.lower() == "auto":
3135       inst_ip = hostname1.ip
3136     else:
3137       if not utils.IsValidIP(ip):
3138         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3139                                    " like a valid IP" % ip)
3140       inst_ip = ip
3141     self.inst_ip = inst_ip
3142
3143     if self.op.start and not self.op.ip_check:
3144       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3145                                  " adding an instance in start mode")
3146
3147     if self.op.ip_check:
3148       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3149         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3150                                    (hostname1.ip, instance_name))
3151
3152     # MAC address verification
3153     if self.op.mac != "auto":
3154       if not utils.IsValidMac(self.op.mac.lower()):
3155         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3156                                    self.op.mac)
3157
3158     # bridge verification
3159     bridge = getattr(self.op, "bridge", None)
3160     if bridge is None:
3161       self.op.bridge = self.cfg.GetDefBridge()
3162     else:
3163       self.op.bridge = bridge
3164
3165     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3166       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3167                                  " destination node '%s'" %
3168                                  (self.op.bridge, pnode.name))
3169
3170     # boot order verification
3171     if self.op.hvm_boot_order is not None:
3172       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3173         raise errors.OpPrereqError("invalid boot order specified,"
3174                                    " must be one or more of [acdn]")
3175
3176     if self.op.start:
3177       self.instance_status = 'up'
3178     else:
3179       self.instance_status = 'down'
3180
3181   def Exec(self, feedback_fn):
3182     """Create and add the instance to the cluster.
3183
3184     """
3185     instance = self.op.instance_name
3186     pnode_name = self.pnode.name
3187
3188     if self.op.mac == "auto":
3189       mac_address = self.cfg.GenerateMAC()
3190     else:
3191       mac_address = self.op.mac
3192
3193     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3194     if self.inst_ip is not None:
3195       nic.ip = self.inst_ip
3196
3197     ht_kind = self.sstore.GetHypervisorType()
3198     if ht_kind in constants.HTS_REQ_PORT:
3199       network_port = self.cfg.AllocatePort()
3200     else:
3201       network_port = None
3202
3203     disks = _GenerateDiskTemplate(self.cfg,
3204                                   self.op.disk_template,
3205                                   instance, pnode_name,
3206                                   self.secondaries, self.op.disk_size,
3207                                   self.op.swap_size)
3208
3209     iobj = objects.Instance(name=instance, os=self.op.os_type,
3210                             primary_node=pnode_name,
3211                             memory=self.op.mem_size,
3212                             vcpus=self.op.vcpus,
3213                             nics=[nic], disks=disks,
3214                             disk_template=self.op.disk_template,
3215                             status=self.instance_status,
3216                             network_port=network_port,
3217                             kernel_path=self.op.kernel_path,
3218                             initrd_path=self.op.initrd_path,
3219                             hvm_boot_order=self.op.hvm_boot_order,
3220                             )
3221
3222     feedback_fn("* creating instance disks...")
3223     if not _CreateDisks(self.cfg, iobj):
3224       _RemoveDisks(iobj, self.cfg)
3225       raise errors.OpExecError("Device creation failed, reverting...")
3226
3227     feedback_fn("adding instance %s to cluster config" % instance)
3228
3229     self.cfg.AddInstance(iobj)
3230
3231     if self.op.wait_for_sync:
3232       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3233     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3234       # make sure the disks are not degraded (still sync-ing is ok)
3235       time.sleep(15)
3236       feedback_fn("* checking mirrors status")
3237       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3238     else:
3239       disk_abort = False
3240
3241     if disk_abort:
3242       _RemoveDisks(iobj, self.cfg)
3243       self.cfg.RemoveInstance(iobj.name)
3244       raise errors.OpExecError("There are some degraded disks for"
3245                                " this instance")
3246
3247     feedback_fn("creating os for instance %s on node %s" %
3248                 (instance, pnode_name))
3249
3250     if iobj.disk_template != constants.DT_DISKLESS:
3251       if self.op.mode == constants.INSTANCE_CREATE:
3252         feedback_fn("* running the instance OS create scripts...")
3253         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3254           raise errors.OpExecError("could not add os for instance %s"
3255                                    " on node %s" %
3256                                    (instance, pnode_name))
3257
3258       elif self.op.mode == constants.INSTANCE_IMPORT:
3259         feedback_fn("* running the instance OS import scripts...")
3260         src_node = self.op.src_node
3261         src_image = self.src_image
3262         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3263                                                 src_node, src_image):
3264           raise errors.OpExecError("Could not import os for instance"
3265                                    " %s on node %s" %
3266                                    (instance, pnode_name))
3267       else:
3268         # also checked in the prereq part
3269         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3270                                      % self.op.mode)
3271
3272     if self.op.start:
3273       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3274       feedback_fn("* starting instance...")
3275       if not rpc.call_instance_start(pnode_name, iobj, None):
3276         raise errors.OpExecError("Could not start instance")
3277
3278
3279 class LUConnectConsole(NoHooksLU):
3280   """Connect to an instance's console.
3281
3282   This is somewhat special in that it returns the command line that
3283   you need to run on the master node in order to connect to the
3284   console.
3285
3286   """
3287   _OP_REQP = ["instance_name"]
3288
3289   def CheckPrereq(self):
3290     """Check prerequisites.
3291
3292     This checks that the instance is in the cluster.
3293
3294     """
3295     instance = self.cfg.GetInstanceInfo(
3296       self.cfg.ExpandInstanceName(self.op.instance_name))
3297     if instance is None:
3298       raise errors.OpPrereqError("Instance '%s' not known" %
3299                                  self.op.instance_name)
3300     self.instance = instance
3301
3302   def Exec(self, feedback_fn):
3303     """Connect to the console of an instance
3304
3305     """
3306     instance = self.instance
3307     node = instance.primary_node
3308
3309     node_insts = rpc.call_instance_list([node])[node]
3310     if node_insts is False:
3311       raise errors.OpExecError("Can't connect to node %s." % node)
3312
3313     if instance.name not in node_insts:
3314       raise errors.OpExecError("Instance %s is not running." % instance.name)
3315
3316     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3317
3318     hyper = hypervisor.GetHypervisor()
3319     console_cmd = hyper.GetShellCommandForConsole(instance)
3320     # build ssh cmdline
3321     argv = ["ssh", "-q", "-t"]
3322     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3323     argv.extend(ssh.BATCH_MODE_OPTS)
3324     argv.append(node)
3325     argv.append(console_cmd)
3326     return "ssh", argv
3327
3328
3329 class LUAddMDDRBDComponent(LogicalUnit):
3330   """Adda new mirror member to an instance's disk.
3331
3332   """
3333   HPATH = "mirror-add"
3334   HTYPE = constants.HTYPE_INSTANCE
3335   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3336
3337   def BuildHooksEnv(self):
3338     """Build hooks env.
3339
3340     This runs on the master, the primary and all the secondaries.
3341
3342     """
3343     env = {
3344       "NEW_SECONDARY": self.op.remote_node,
3345       "DISK_NAME": self.op.disk_name,
3346       }
3347     env.update(_BuildInstanceHookEnvByObject(self.instance))
3348     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3349           self.op.remote_node,] + list(self.instance.secondary_nodes)
3350     return env, nl, nl
3351
3352   def CheckPrereq(self):
3353     """Check prerequisites.
3354
3355     This checks that the instance is in the cluster.
3356
3357     """
3358     instance = self.cfg.GetInstanceInfo(
3359       self.cfg.ExpandInstanceName(self.op.instance_name))
3360     if instance is None:
3361       raise errors.OpPrereqError("Instance '%s' not known" %
3362                                  self.op.instance_name)
3363     self.instance = instance
3364
3365     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3366     if remote_node is None:
3367       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3368     self.remote_node = remote_node
3369
3370     if remote_node == instance.primary_node:
3371       raise errors.OpPrereqError("The specified node is the primary node of"
3372                                  " the instance.")
3373
3374     if instance.disk_template != constants.DT_REMOTE_RAID1:
3375       raise errors.OpPrereqError("Instance's disk layout is not"
3376                                  " remote_raid1.")
3377     for disk in instance.disks:
3378       if disk.iv_name == self.op.disk_name:
3379         break
3380     else:
3381       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3382                                  " instance." % self.op.disk_name)
3383     if len(disk.children) > 1:
3384       raise errors.OpPrereqError("The device already has two slave devices."
3385                                  " This would create a 3-disk raid1 which we"
3386                                  " don't allow.")
3387     self.disk = disk
3388
3389   def Exec(self, feedback_fn):
3390     """Add the mirror component
3391
3392     """
3393     disk = self.disk
3394     instance = self.instance
3395
3396     remote_node = self.remote_node
3397     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3398     names = _GenerateUniqueNames(self.cfg, lv_names)
3399     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3400                                      remote_node, disk.size, names)
3401
3402     logger.Info("adding new mirror component on secondary")
3403     #HARDCODE
3404     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3405                                       new_drbd, False,
3406                                       _GetInstanceInfoText(instance)):
3407       raise errors.OpExecError("Failed to create new component on secondary"
3408                                " node %s" % remote_node)
3409
3410     logger.Info("adding new mirror component on primary")
3411     #HARDCODE
3412     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3413                                     instance, new_drbd,
3414                                     _GetInstanceInfoText(instance)):
3415       # remove secondary dev
3416       self.cfg.SetDiskID(new_drbd, remote_node)
3417       rpc.call_blockdev_remove(remote_node, new_drbd)
3418       raise errors.OpExecError("Failed to create volume on primary")
3419
3420     # the device exists now
3421     # call the primary node to add the mirror to md
3422     logger.Info("adding new mirror component to md")
3423     if not rpc.call_blockdev_addchildren(instance.primary_node,
3424                                          disk, [new_drbd]):
3425       logger.Error("Can't add mirror compoment to md!")
3426       self.cfg.SetDiskID(new_drbd, remote_node)
3427       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3428         logger.Error("Can't rollback on secondary")
3429       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3430       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3431         logger.Error("Can't rollback on primary")
3432       raise errors.OpExecError("Can't add mirror component to md array")
3433
3434     disk.children.append(new_drbd)
3435
3436     self.cfg.AddInstance(instance)
3437
3438     _WaitForSync(self.cfg, instance, self.proc)
3439
3440     return 0
3441
3442
3443 class LURemoveMDDRBDComponent(LogicalUnit):
3444   """Remove a component from a remote_raid1 disk.
3445
3446   """
3447   HPATH = "mirror-remove"
3448   HTYPE = constants.HTYPE_INSTANCE
3449   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3450
3451   def BuildHooksEnv(self):
3452     """Build hooks env.
3453
3454     This runs on the master, the primary and all the secondaries.
3455
3456     """
3457     env = {
3458       "DISK_NAME": self.op.disk_name,
3459       "DISK_ID": self.op.disk_id,
3460       "OLD_SECONDARY": self.old_secondary,
3461       }
3462     env.update(_BuildInstanceHookEnvByObject(self.instance))
3463     nl = [self.sstore.GetMasterNode(),
3464           self.instance.primary_node] + list(self.instance.secondary_nodes)
3465     return env, nl, nl
3466
3467   def CheckPrereq(self):
3468     """Check prerequisites.
3469
3470     This checks that the instance is in the cluster.
3471
3472     """
3473     instance = self.cfg.GetInstanceInfo(
3474       self.cfg.ExpandInstanceName(self.op.instance_name))
3475     if instance is None:
3476       raise errors.OpPrereqError("Instance '%s' not known" %
3477                                  self.op.instance_name)
3478     self.instance = instance
3479
3480     if instance.disk_template != constants.DT_REMOTE_RAID1:
3481       raise errors.OpPrereqError("Instance's disk layout is not"
3482                                  " remote_raid1.")
3483     for disk in instance.disks:
3484       if disk.iv_name == self.op.disk_name:
3485         break
3486     else:
3487       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3488                                  " instance." % self.op.disk_name)
3489     for child in disk.children:
3490       if (child.dev_type == constants.LD_DRBD7 and
3491           child.logical_id[2] == self.op.disk_id):
3492         break
3493     else:
3494       raise errors.OpPrereqError("Can't find the device with this port.")
3495
3496     if len(disk.children) < 2:
3497       raise errors.OpPrereqError("Cannot remove the last component from"
3498                                  " a mirror.")
3499     self.disk = disk
3500     self.child = child
3501     if self.child.logical_id[0] == instance.primary_node:
3502       oid = 1
3503     else:
3504       oid = 0
3505     self.old_secondary = self.child.logical_id[oid]
3506
3507   def Exec(self, feedback_fn):
3508     """Remove the mirror component
3509
3510     """
3511     instance = self.instance
3512     disk = self.disk
3513     child = self.child
3514     logger.Info("remove mirror component")
3515     self.cfg.SetDiskID(disk, instance.primary_node)
3516     if not rpc.call_blockdev_removechildren(instance.primary_node,
3517                                             disk, [child]):
3518       raise errors.OpExecError("Can't remove child from mirror.")
3519
3520     for node in child.logical_id[:2]:
3521       self.cfg.SetDiskID(child, node)
3522       if not rpc.call_blockdev_remove(node, child):
3523         logger.Error("Warning: failed to remove device from node %s,"
3524                      " continuing operation." % node)
3525
3526     disk.children.remove(child)
3527     self.cfg.AddInstance(instance)
3528
3529
3530 class LUReplaceDisks(LogicalUnit):
3531   """Replace the disks of an instance.
3532
3533   """
3534   HPATH = "mirrors-replace"
3535   HTYPE = constants.HTYPE_INSTANCE
3536   _OP_REQP = ["instance_name", "mode", "disks"]
3537
3538   def BuildHooksEnv(self):
3539     """Build hooks env.
3540
3541     This runs on the master, the primary and all the secondaries.
3542
3543     """
3544     env = {
3545       "MODE": self.op.mode,
3546       "NEW_SECONDARY": self.op.remote_node,
3547       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3548       }
3549     env.update(_BuildInstanceHookEnvByObject(self.instance))
3550     nl = [
3551       self.sstore.GetMasterNode(),
3552       self.instance.primary_node,
3553       ]
3554     if self.op.remote_node is not None:
3555       nl.append(self.op.remote_node)
3556     return env, nl, nl
3557
3558   def CheckPrereq(self):
3559     """Check prerequisites.
3560
3561     This checks that the instance is in the cluster.
3562
3563     """
3564     instance = self.cfg.GetInstanceInfo(
3565       self.cfg.ExpandInstanceName(self.op.instance_name))
3566     if instance is None:
3567       raise errors.OpPrereqError("Instance '%s' not known" %
3568                                  self.op.instance_name)
3569     self.instance = instance
3570     self.op.instance_name = instance.name
3571
3572     if instance.disk_template not in constants.DTS_NET_MIRROR:
3573       raise errors.OpPrereqError("Instance's disk layout is not"
3574                                  " network mirrored.")
3575
3576     if len(instance.secondary_nodes) != 1:
3577       raise errors.OpPrereqError("The instance has a strange layout,"
3578                                  " expected one secondary but found %d" %
3579                                  len(instance.secondary_nodes))
3580
3581     self.sec_node = instance.secondary_nodes[0]
3582
3583     remote_node = getattr(self.op, "remote_node", None)
3584     if remote_node is not None:
3585       remote_node = self.cfg.ExpandNodeName(remote_node)
3586       if remote_node is None:
3587         raise errors.OpPrereqError("Node '%s' not known" %
3588                                    self.op.remote_node)
3589       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3590     else:
3591       self.remote_node_info = None
3592     if remote_node == instance.primary_node:
3593       raise errors.OpPrereqError("The specified node is the primary node of"
3594                                  " the instance.")
3595     elif remote_node == self.sec_node:
3596       if self.op.mode == constants.REPLACE_DISK_SEC:
3597         # this is for DRBD8, where we can't execute the same mode of
3598         # replacement as for drbd7 (no different port allocated)
3599         raise errors.OpPrereqError("Same secondary given, cannot execute"
3600                                    " replacement")
3601       # the user gave the current secondary, switch to
3602       # 'no-replace-secondary' mode for drbd7
3603       remote_node = None
3604     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3605         self.op.mode != constants.REPLACE_DISK_ALL):
3606       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3607                                  " disks replacement, not individual ones")
3608     if instance.disk_template == constants.DT_DRBD8:
3609       if (self.op.mode == constants.REPLACE_DISK_ALL and
3610           remote_node is not None):
3611         # switch to replace secondary mode
3612         self.op.mode = constants.REPLACE_DISK_SEC
3613
3614       if self.op.mode == constants.REPLACE_DISK_ALL:
3615         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3616                                    " secondary disk replacement, not"
3617                                    " both at once")
3618       elif self.op.mode == constants.REPLACE_DISK_PRI:
3619         if remote_node is not None:
3620           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3621                                      " the secondary while doing a primary"
3622                                      " node disk replacement")
3623         self.tgt_node = instance.primary_node
3624         self.oth_node = instance.secondary_nodes[0]
3625       elif self.op.mode == constants.REPLACE_DISK_SEC:
3626         self.new_node = remote_node # this can be None, in which case
3627                                     # we don't change the secondary
3628         self.tgt_node = instance.secondary_nodes[0]
3629         self.oth_node = instance.primary_node
3630       else:
3631         raise errors.ProgrammerError("Unhandled disk replace mode")
3632
3633     for name in self.op.disks:
3634       if instance.FindDisk(name) is None:
3635         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3636                                    (name, instance.name))
3637     self.op.remote_node = remote_node
3638
3639   def _ExecRR1(self, feedback_fn):
3640     """Replace the disks of an instance.
3641
3642     """
3643     instance = self.instance
3644     iv_names = {}
3645     # start of work
3646     if self.op.remote_node is None:
3647       remote_node = self.sec_node
3648     else:
3649       remote_node = self.op.remote_node
3650     cfg = self.cfg
3651     for dev in instance.disks:
3652       size = dev.size
3653       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3654       names = _GenerateUniqueNames(cfg, lv_names)
3655       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3656                                        remote_node, size, names)
3657       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3658       logger.Info("adding new mirror component on secondary for %s" %
3659                   dev.iv_name)
3660       #HARDCODE
3661       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3662                                         new_drbd, False,
3663                                         _GetInstanceInfoText(instance)):
3664         raise errors.OpExecError("Failed to create new component on secondary"
3665                                  " node %s. Full abort, cleanup manually!" %
3666                                  remote_node)
3667
3668       logger.Info("adding new mirror component on primary")
3669       #HARDCODE
3670       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3671                                       instance, new_drbd,
3672                                       _GetInstanceInfoText(instance)):
3673         # remove secondary dev
3674         cfg.SetDiskID(new_drbd, remote_node)
3675         rpc.call_blockdev_remove(remote_node, new_drbd)
3676         raise errors.OpExecError("Failed to create volume on primary!"
3677                                  " Full abort, cleanup manually!!")
3678
3679       # the device exists now
3680       # call the primary node to add the mirror to md
3681       logger.Info("adding new mirror component to md")
3682       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3683                                            [new_drbd]):
3684         logger.Error("Can't add mirror compoment to md!")
3685         cfg.SetDiskID(new_drbd, remote_node)
3686         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3687           logger.Error("Can't rollback on secondary")
3688         cfg.SetDiskID(new_drbd, instance.primary_node)
3689         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3690           logger.Error("Can't rollback on primary")
3691         raise errors.OpExecError("Full abort, cleanup manually!!")
3692
3693       dev.children.append(new_drbd)
3694       cfg.AddInstance(instance)
3695
3696     # this can fail as the old devices are degraded and _WaitForSync
3697     # does a combined result over all disks, so we don't check its
3698     # return value
3699     _WaitForSync(cfg, instance, self.proc, unlock=True)
3700
3701     # so check manually all the devices
3702     for name in iv_names:
3703       dev, child, new_drbd = iv_names[name]
3704       cfg.SetDiskID(dev, instance.primary_node)
3705       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3706       if is_degr:
3707         raise errors.OpExecError("MD device %s is degraded!" % name)
3708       cfg.SetDiskID(new_drbd, instance.primary_node)
3709       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3710       if is_degr:
3711         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3712
3713     for name in iv_names:
3714       dev, child, new_drbd = iv_names[name]
3715       logger.Info("remove mirror %s component" % name)
3716       cfg.SetDiskID(dev, instance.primary_node)
3717       if not rpc.call_blockdev_removechildren(instance.primary_node,
3718                                               dev, [child]):
3719         logger.Error("Can't remove child from mirror, aborting"
3720                      " *this device cleanup*.\nYou need to cleanup manually!!")
3721         continue
3722
3723       for node in child.logical_id[:2]:
3724         logger.Info("remove child device on %s" % node)
3725         cfg.SetDiskID(child, node)
3726         if not rpc.call_blockdev_remove(node, child):
3727           logger.Error("Warning: failed to remove device from node %s,"
3728                        " continuing operation." % node)
3729
3730       dev.children.remove(child)
3731
3732       cfg.AddInstance(instance)
3733
3734   def _ExecD8DiskOnly(self, feedback_fn):
3735     """Replace a disk on the primary or secondary for dbrd8.
3736
3737     The algorithm for replace is quite complicated:
3738       - for each disk to be replaced:
3739         - create new LVs on the target node with unique names
3740         - detach old LVs from the drbd device
3741         - rename old LVs to name_replaced.<time_t>
3742         - rename new LVs to old LVs
3743         - attach the new LVs (with the old names now) to the drbd device
3744       - wait for sync across all devices
3745       - for each modified disk:
3746         - remove old LVs (which have the name name_replaces.<time_t>)
3747
3748     Failures are not very well handled.
3749
3750     """
3751     steps_total = 6
3752     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3753     instance = self.instance
3754     iv_names = {}
3755     vgname = self.cfg.GetVGName()
3756     # start of work
3757     cfg = self.cfg
3758     tgt_node = self.tgt_node
3759     oth_node = self.oth_node
3760
3761     # Step: check device activation
3762     self.proc.LogStep(1, steps_total, "check device existence")
3763     info("checking volume groups")
3764     my_vg = cfg.GetVGName()
3765     results = rpc.call_vg_list([oth_node, tgt_node])
3766     if not results:
3767       raise errors.OpExecError("Can't list volume groups on the nodes")
3768     for node in oth_node, tgt_node:
3769       res = results.get(node, False)
3770       if not res or my_vg not in res:
3771         raise errors.OpExecError("Volume group '%s' not found on %s" %
3772                                  (my_vg, node))
3773     for dev in instance.disks:
3774       if not dev.iv_name in self.op.disks:
3775         continue
3776       for node in tgt_node, oth_node:
3777         info("checking %s on %s" % (dev.iv_name, node))
3778         cfg.SetDiskID(dev, node)
3779         if not rpc.call_blockdev_find(node, dev):
3780           raise errors.OpExecError("Can't find device %s on node %s" %
3781                                    (dev.iv_name, node))
3782
3783     # Step: check other node consistency
3784     self.proc.LogStep(2, steps_total, "check peer consistency")
3785     for dev in instance.disks:
3786       if not dev.iv_name in self.op.disks:
3787         continue
3788       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3789       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3790                                    oth_node==instance.primary_node):
3791         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3792                                  " to replace disks on this node (%s)" %
3793                                  (oth_node, tgt_node))
3794
3795     # Step: create new storage
3796     self.proc.LogStep(3, steps_total, "allocate new storage")
3797     for dev in instance.disks:
3798       if not dev.iv_name in self.op.disks:
3799         continue
3800       size = dev.size
3801       cfg.SetDiskID(dev, tgt_node)
3802       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3803       names = _GenerateUniqueNames(cfg, lv_names)
3804       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3805                              logical_id=(vgname, names[0]))
3806       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3807                              logical_id=(vgname, names[1]))
3808       new_lvs = [lv_data, lv_meta]
3809       old_lvs = dev.children
3810       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3811       info("creating new local storage on %s for %s" %
3812            (tgt_node, dev.iv_name))
3813       # since we *always* want to create this LV, we use the
3814       # _Create...OnPrimary (which forces the creation), even if we
3815       # are talking about the secondary node
3816       for new_lv in new_lvs:
3817         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3818                                         _GetInstanceInfoText(instance)):
3819           raise errors.OpExecError("Failed to create new LV named '%s' on"
3820                                    " node '%s'" %
3821                                    (new_lv.logical_id[1], tgt_node))
3822
3823     # Step: for each lv, detach+rename*2+attach
3824     self.proc.LogStep(4, steps_total, "change drbd configuration")
3825     for dev, old_lvs, new_lvs in iv_names.itervalues():
3826       info("detaching %s drbd from local storage" % dev.iv_name)
3827       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3828         raise errors.OpExecError("Can't detach drbd from local storage on node"
3829                                  " %s for device %s" % (tgt_node, dev.iv_name))
3830       #dev.children = []
3831       #cfg.Update(instance)
3832
3833       # ok, we created the new LVs, so now we know we have the needed
3834       # storage; as such, we proceed on the target node to rename
3835       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3836       # using the assumption that logical_id == physical_id (which in
3837       # turn is the unique_id on that node)
3838
3839       # FIXME(iustin): use a better name for the replaced LVs
3840       temp_suffix = int(time.time())
3841       ren_fn = lambda d, suff: (d.physical_id[0],
3842                                 d.physical_id[1] + "_replaced-%s" % suff)
3843       # build the rename list based on what LVs exist on the node
3844       rlist = []
3845       for to_ren in old_lvs:
3846         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3847         if find_res is not None: # device exists
3848           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3849
3850       info("renaming the old LVs on the target node")
3851       if not rpc.call_blockdev_rename(tgt_node, rlist):
3852         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3853       # now we rename the new LVs to the old LVs
3854       info("renaming the new LVs on the target node")
3855       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3856       if not rpc.call_blockdev_rename(tgt_node, rlist):
3857         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3858
3859       for old, new in zip(old_lvs, new_lvs):
3860         new.logical_id = old.logical_id
3861         cfg.SetDiskID(new, tgt_node)
3862
3863       for disk in old_lvs:
3864         disk.logical_id = ren_fn(disk, temp_suffix)
3865         cfg.SetDiskID(disk, tgt_node)
3866
3867       # now that the new lvs have the old name, we can add them to the device
3868       info("adding new mirror component on %s" % tgt_node)
3869       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3870         for new_lv in new_lvs:
3871           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3872             warning("Can't rollback device %s", hint="manually cleanup unused"
3873                     " logical volumes")
3874         raise errors.OpExecError("Can't add local storage to drbd")
3875
3876       dev.children = new_lvs
3877       cfg.Update(instance)
3878
3879     # Step: wait for sync
3880
3881     # this can fail as the old devices are degraded and _WaitForSync
3882     # does a combined result over all disks, so we don't check its
3883     # return value
3884     self.proc.LogStep(5, steps_total, "sync devices")
3885     _WaitForSync(cfg, instance, self.proc, unlock=True)
3886
3887     # so check manually all the devices
3888     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3889       cfg.SetDiskID(dev, instance.primary_node)
3890       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3891       if is_degr:
3892         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3893
3894     # Step: remove old storage
3895     self.proc.LogStep(6, steps_total, "removing old storage")
3896     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3897       info("remove logical volumes for %s" % name)
3898       for lv in old_lvs:
3899         cfg.SetDiskID(lv, tgt_node)
3900         if not rpc.call_blockdev_remove(tgt_node, lv):
3901           warning("Can't remove old LV", hint="manually remove unused LVs")
3902           continue
3903
3904   def _ExecD8Secondary(self, feedback_fn):
3905     """Replace the secondary node for drbd8.
3906
3907     The algorithm for replace is quite complicated:
3908       - for all disks of the instance:
3909         - create new LVs on the new node with same names
3910         - shutdown the drbd device on the old secondary
3911         - disconnect the drbd network on the primary
3912         - create the drbd device on the new secondary
3913         - network attach the drbd on the primary, using an artifice:
3914           the drbd code for Attach() will connect to the network if it
3915           finds a device which is connected to the good local disks but
3916           not network enabled
3917       - wait for sync across all devices
3918       - remove all disks from the old secondary
3919
3920     Failures are not very well handled.
3921
3922     """
3923     steps_total = 6
3924     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3925     instance = self.instance
3926     iv_names = {}
3927     vgname = self.cfg.GetVGName()
3928     # start of work
3929     cfg = self.cfg
3930     old_node = self.tgt_node
3931     new_node = self.new_node
3932     pri_node = instance.primary_node
3933
3934     # Step: check device activation
3935     self.proc.LogStep(1, steps_total, "check device existence")
3936     info("checking volume groups")
3937     my_vg = cfg.GetVGName()
3938     results = rpc.call_vg_list([pri_node, new_node])
3939     if not results:
3940       raise errors.OpExecError("Can't list volume groups on the nodes")
3941     for node in pri_node, new_node:
3942       res = results.get(node, False)
3943       if not res or my_vg not in res:
3944         raise errors.OpExecError("Volume group '%s' not found on %s" %
3945                                  (my_vg, node))
3946     for dev in instance.disks:
3947       if not dev.iv_name in self.op.disks:
3948         continue
3949       info("checking %s on %s" % (dev.iv_name, pri_node))
3950       cfg.SetDiskID(dev, pri_node)
3951       if not rpc.call_blockdev_find(pri_node, dev):
3952         raise errors.OpExecError("Can't find device %s on node %s" %
3953                                  (dev.iv_name, pri_node))
3954
3955     # Step: check other node consistency
3956     self.proc.LogStep(2, steps_total, "check peer consistency")
3957     for dev in instance.disks:
3958       if not dev.iv_name in self.op.disks:
3959         continue
3960       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3961       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3962         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3963                                  " unsafe to replace the secondary" %
3964                                  pri_node)
3965
3966     # Step: create new storage
3967     self.proc.LogStep(3, steps_total, "allocate new storage")
3968     for dev in instance.disks:
3969       size = dev.size
3970       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3971       # since we *always* want to create this LV, we use the
3972       # _Create...OnPrimary (which forces the creation), even if we
3973       # are talking about the secondary node
3974       for new_lv in dev.children:
3975         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3976                                         _GetInstanceInfoText(instance)):
3977           raise errors.OpExecError("Failed to create new LV named '%s' on"
3978                                    " node '%s'" %
3979                                    (new_lv.logical_id[1], new_node))
3980
3981       iv_names[dev.iv_name] = (dev, dev.children)
3982
3983     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3984     for dev in instance.disks:
3985       size = dev.size
3986       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3987       # create new devices on new_node
3988       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3989                               logical_id=(pri_node, new_node,
3990                                           dev.logical_id[2]),
3991                               children=dev.children)
3992       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3993                                         new_drbd, False,
3994                                       _GetInstanceInfoText(instance)):
3995         raise errors.OpExecError("Failed to create new DRBD on"
3996                                  " node '%s'" % new_node)
3997
3998     for dev in instance.disks:
3999       # we have new devices, shutdown the drbd on the old secondary
4000       info("shutting down drbd for %s on old node" % dev.iv_name)
4001       cfg.SetDiskID(dev, old_node)
4002       if not rpc.call_blockdev_shutdown(old_node, dev):
4003         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4004                 hint="Please cleanup this device manually as soon as possible")
4005
4006     info("detaching primary drbds from the network (=> standalone)")
4007     done = 0
4008     for dev in instance.disks:
4009       cfg.SetDiskID(dev, pri_node)
4010       # set the physical (unique in bdev terms) id to None, meaning
4011       # detach from network
4012       dev.physical_id = (None,) * len(dev.physical_id)
4013       # and 'find' the device, which will 'fix' it to match the
4014       # standalone state
4015       if rpc.call_blockdev_find(pri_node, dev):
4016         done += 1
4017       else:
4018         warning("Failed to detach drbd %s from network, unusual case" %
4019                 dev.iv_name)
4020
4021     if not done:
4022       # no detaches succeeded (very unlikely)
4023       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4024
4025     # if we managed to detach at least one, we update all the disks of
4026     # the instance to point to the new secondary
4027     info("updating instance configuration")
4028     for dev in instance.disks:
4029       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4030       cfg.SetDiskID(dev, pri_node)
4031     cfg.Update(instance)
4032
4033     # and now perform the drbd attach
4034     info("attaching primary drbds to new secondary (standalone => connected)")
4035     failures = []
4036     for dev in instance.disks:
4037       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4038       # since the attach is smart, it's enough to 'find' the device,
4039       # it will automatically activate the network, if the physical_id
4040       # is correct
4041       cfg.SetDiskID(dev, pri_node)
4042       if not rpc.call_blockdev_find(pri_node, dev):
4043         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4044                 "please do a gnt-instance info to see the status of disks")
4045
4046     # this can fail as the old devices are degraded and _WaitForSync
4047     # does a combined result over all disks, so we don't check its
4048     # return value
4049     self.proc.LogStep(5, steps_total, "sync devices")
4050     _WaitForSync(cfg, instance, self.proc, unlock=True)
4051
4052     # so check manually all the devices
4053     for name, (dev, old_lvs) in iv_names.iteritems():
4054       cfg.SetDiskID(dev, pri_node)
4055       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4056       if is_degr:
4057         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4058
4059     self.proc.LogStep(6, steps_total, "removing old storage")
4060     for name, (dev, old_lvs) in iv_names.iteritems():
4061       info("remove logical volumes for %s" % name)
4062       for lv in old_lvs:
4063         cfg.SetDiskID(lv, old_node)
4064         if not rpc.call_blockdev_remove(old_node, lv):
4065           warning("Can't remove LV on old secondary",
4066                   hint="Cleanup stale volumes by hand")
4067
4068   def Exec(self, feedback_fn):
4069     """Execute disk replacement.
4070
4071     This dispatches the disk replacement to the appropriate handler.
4072
4073     """
4074     instance = self.instance
4075     if instance.disk_template == constants.DT_REMOTE_RAID1:
4076       fn = self._ExecRR1
4077     elif instance.disk_template == constants.DT_DRBD8:
4078       if self.op.remote_node is None:
4079         fn = self._ExecD8DiskOnly
4080       else:
4081         fn = self._ExecD8Secondary
4082     else:
4083       raise errors.ProgrammerError("Unhandled disk replacement case")
4084     return fn(feedback_fn)
4085
4086
4087 class LUQueryInstanceData(NoHooksLU):
4088   """Query runtime instance data.
4089
4090   """
4091   _OP_REQP = ["instances"]
4092
4093   def CheckPrereq(self):
4094     """Check prerequisites.
4095
4096     This only checks the optional instance list against the existing names.
4097
4098     """
4099     if not isinstance(self.op.instances, list):
4100       raise errors.OpPrereqError("Invalid argument type 'instances'")
4101     if self.op.instances:
4102       self.wanted_instances = []
4103       names = self.op.instances
4104       for name in names:
4105         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4106         if instance is None:
4107           raise errors.OpPrereqError("No such instance name '%s'" % name)
4108         self.wanted_instances.append(instance)
4109     else:
4110       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4111                                in self.cfg.GetInstanceList()]
4112     return
4113
4114
4115   def _ComputeDiskStatus(self, instance, snode, dev):
4116     """Compute block device status.
4117
4118     """
4119     self.cfg.SetDiskID(dev, instance.primary_node)
4120     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4121     if dev.dev_type in constants.LDS_DRBD:
4122       # we change the snode then (otherwise we use the one passed in)
4123       if dev.logical_id[0] == instance.primary_node:
4124         snode = dev.logical_id[1]
4125       else:
4126         snode = dev.logical_id[0]
4127
4128     if snode:
4129       self.cfg.SetDiskID(dev, snode)
4130       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4131     else:
4132       dev_sstatus = None
4133
4134     if dev.children:
4135       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4136                       for child in dev.children]
4137     else:
4138       dev_children = []
4139
4140     data = {
4141       "iv_name": dev.iv_name,
4142       "dev_type": dev.dev_type,
4143       "logical_id": dev.logical_id,
4144       "physical_id": dev.physical_id,
4145       "pstatus": dev_pstatus,
4146       "sstatus": dev_sstatus,
4147       "children": dev_children,
4148       }
4149
4150     return data
4151
4152   def Exec(self, feedback_fn):
4153     """Gather and return data"""
4154     result = {}
4155     for instance in self.wanted_instances:
4156       remote_info = rpc.call_instance_info(instance.primary_node,
4157                                                 instance.name)
4158       if remote_info and "state" in remote_info:
4159         remote_state = "up"
4160       else:
4161         remote_state = "down"
4162       if instance.status == "down":
4163         config_state = "down"
4164       else:
4165         config_state = "up"
4166
4167       disks = [self._ComputeDiskStatus(instance, None, device)
4168                for device in instance.disks]
4169
4170       idict = {
4171         "name": instance.name,
4172         "config_state": config_state,
4173         "run_state": remote_state,
4174         "pnode": instance.primary_node,
4175         "snodes": instance.secondary_nodes,
4176         "os": instance.os,
4177         "memory": instance.memory,
4178         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4179         "disks": disks,
4180         "network_port": instance.network_port,
4181         "vcpus": instance.vcpus,
4182         "kernel_path": instance.kernel_path,
4183         "initrd_path": instance.initrd_path,
4184         "hvm_boot_order": instance.hvm_boot_order,
4185         }
4186
4187       result[instance.name] = idict
4188
4189     return result
4190
4191
4192 class LUSetInstanceParms(LogicalUnit):
4193   """Modifies an instances's parameters.
4194
4195   """
4196   HPATH = "instance-modify"
4197   HTYPE = constants.HTYPE_INSTANCE
4198   _OP_REQP = ["instance_name"]
4199
4200   def BuildHooksEnv(self):
4201     """Build hooks env.
4202
4203     This runs on the master, primary and secondaries.
4204
4205     """
4206     args = dict()
4207     if self.mem:
4208       args['memory'] = self.mem
4209     if self.vcpus:
4210       args['vcpus'] = self.vcpus
4211     if self.do_ip or self.do_bridge or self.mac:
4212       if self.do_ip:
4213         ip = self.ip
4214       else:
4215         ip = self.instance.nics[0].ip
4216       if self.bridge:
4217         bridge = self.bridge
4218       else:
4219         bridge = self.instance.nics[0].bridge
4220       if self.mac:
4221         mac = self.mac
4222       else:
4223         mac = self.instance.nics[0].mac
4224       args['nics'] = [(ip, bridge, mac)]
4225     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4226     nl = [self.sstore.GetMasterNode(),
4227           self.instance.primary_node] + list(self.instance.secondary_nodes)
4228     return env, nl, nl
4229
4230   def CheckPrereq(self):
4231     """Check prerequisites.
4232
4233     This only checks the instance list against the existing names.
4234
4235     """
4236     self.mem = getattr(self.op, "mem", None)
4237     self.vcpus = getattr(self.op, "vcpus", None)
4238     self.ip = getattr(self.op, "ip", None)
4239     self.mac = getattr(self.op, "mac", None)
4240     self.bridge = getattr(self.op, "bridge", None)
4241     self.kernel_path = getattr(self.op, "kernel_path", None)
4242     self.initrd_path = getattr(self.op, "initrd_path", None)
4243     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4244     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4245                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4246     if all_parms.count(None) == len(all_parms):
4247       raise errors.OpPrereqError("No changes submitted")
4248     if self.mem is not None:
4249       try:
4250         self.mem = int(self.mem)
4251       except ValueError, err:
4252         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4253     if self.vcpus is not None:
4254       try:
4255         self.vcpus = int(self.vcpus)
4256       except ValueError, err:
4257         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4258     if self.ip is not None:
4259       self.do_ip = True
4260       if self.ip.lower() == "none":
4261         self.ip = None
4262       else:
4263         if not utils.IsValidIP(self.ip):
4264           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4265     else:
4266       self.do_ip = False
4267     self.do_bridge = (self.bridge is not None)
4268     if self.mac is not None:
4269       if self.cfg.IsMacInUse(self.mac):
4270         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4271                                    self.mac)
4272       if not utils.IsValidMac(self.mac):
4273         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4274
4275     if self.kernel_path is not None:
4276       self.do_kernel_path = True
4277       if self.kernel_path == constants.VALUE_NONE:
4278         raise errors.OpPrereqError("Can't set instance to no kernel")
4279
4280       if self.kernel_path != constants.VALUE_DEFAULT:
4281         if not os.path.isabs(self.kernel_path):
4282           raise errors.OpPrereqError("The kernel path must be an absolute"
4283                                     " filename")
4284     else:
4285       self.do_kernel_path = False
4286
4287     if self.initrd_path is not None:
4288       self.do_initrd_path = True
4289       if self.initrd_path not in (constants.VALUE_NONE,
4290                                   constants.VALUE_DEFAULT):
4291         if not os.path.isabs(self.initrd_path):
4292           raise errors.OpPrereqError("The initrd path must be an absolute"
4293                                     " filename")
4294     else:
4295       self.do_initrd_path = False
4296
4297     # boot order verification
4298     if self.hvm_boot_order is not None:
4299       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4300         if len(self.hvm_boot_order.strip("acdn")) != 0:
4301           raise errors.OpPrereqError("invalid boot order specified,"
4302                                      " must be one or more of [acdn]"
4303                                      " or 'default'")
4304
4305     instance = self.cfg.GetInstanceInfo(
4306       self.cfg.ExpandInstanceName(self.op.instance_name))
4307     if instance is None:
4308       raise errors.OpPrereqError("No such instance name '%s'" %
4309                                  self.op.instance_name)
4310     self.op.instance_name = instance.name
4311     self.instance = instance
4312     return
4313
4314   def Exec(self, feedback_fn):
4315     """Modifies an instance.
4316
4317     All parameters take effect only at the next restart of the instance.
4318     """
4319     result = []
4320     instance = self.instance
4321     if self.mem:
4322       instance.memory = self.mem
4323       result.append(("mem", self.mem))
4324     if self.vcpus:
4325       instance.vcpus = self.vcpus
4326       result.append(("vcpus",  self.vcpus))
4327     if self.do_ip:
4328       instance.nics[0].ip = self.ip
4329       result.append(("ip", self.ip))
4330     if self.bridge:
4331       instance.nics[0].bridge = self.bridge
4332       result.append(("bridge", self.bridge))
4333     if self.mac:
4334       instance.nics[0].mac = self.mac
4335       result.append(("mac", self.mac))
4336     if self.do_kernel_path:
4337       instance.kernel_path = self.kernel_path
4338       result.append(("kernel_path", self.kernel_path))
4339     if self.do_initrd_path:
4340       instance.initrd_path = self.initrd_path
4341       result.append(("initrd_path", self.initrd_path))
4342     if self.hvm_boot_order:
4343       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4344         instance.hvm_boot_order = None
4345       else:
4346         instance.hvm_boot_order = self.hvm_boot_order
4347       result.append(("hvm_boot_order", self.hvm_boot_order))
4348
4349     self.cfg.AddInstance(instance)
4350
4351     return result
4352
4353
4354 class LUQueryExports(NoHooksLU):
4355   """Query the exports list
4356
4357   """
4358   _OP_REQP = []
4359
4360   def CheckPrereq(self):
4361     """Check that the nodelist contains only existing nodes.
4362
4363     """
4364     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4365
4366   def Exec(self, feedback_fn):
4367     """Compute the list of all the exported system images.
4368
4369     Returns:
4370       a dictionary with the structure node->(export-list)
4371       where export-list is a list of the instances exported on
4372       that node.
4373
4374     """
4375     return rpc.call_export_list(self.nodes)
4376
4377
4378 class LUExportInstance(LogicalUnit):
4379   """Export an instance to an image in the cluster.
4380
4381   """
4382   HPATH = "instance-export"
4383   HTYPE = constants.HTYPE_INSTANCE
4384   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4385
4386   def BuildHooksEnv(self):
4387     """Build hooks env.
4388
4389     This will run on the master, primary node and target node.
4390
4391     """
4392     env = {
4393       "EXPORT_NODE": self.op.target_node,
4394       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4395       }
4396     env.update(_BuildInstanceHookEnvByObject(self.instance))
4397     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4398           self.op.target_node]
4399     return env, nl, nl
4400
4401   def CheckPrereq(self):
4402     """Check prerequisites.
4403
4404     This checks that the instance name is a valid one.
4405
4406     """
4407     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4408     self.instance = self.cfg.GetInstanceInfo(instance_name)
4409     if self.instance is None:
4410       raise errors.OpPrereqError("Instance '%s' not found" %
4411                                  self.op.instance_name)
4412
4413     # node verification
4414     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4415     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4416
4417     if self.dst_node is None:
4418       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4419                                  self.op.target_node)
4420     self.op.target_node = self.dst_node.name
4421
4422   def Exec(self, feedback_fn):
4423     """Export an instance to an image in the cluster.
4424
4425     """
4426     instance = self.instance
4427     dst_node = self.dst_node
4428     src_node = instance.primary_node
4429     if self.op.shutdown:
4430       # shutdown the instance, but not the disks
4431       if not rpc.call_instance_shutdown(src_node, instance):
4432          raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4433                                  (instance.name, source_node))
4434
4435     vgname = self.cfg.GetVGName()
4436
4437     snap_disks = []
4438
4439     try:
4440       for disk in instance.disks:
4441         if disk.iv_name == "sda":
4442           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4443           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4444
4445           if not new_dev_name:
4446             logger.Error("could not snapshot block device %s on node %s" %
4447                          (disk.logical_id[1], src_node))
4448           else:
4449             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4450                                       logical_id=(vgname, new_dev_name),
4451                                       physical_id=(vgname, new_dev_name),
4452                                       iv_name=disk.iv_name)
4453             snap_disks.append(new_dev)
4454
4455     finally:
4456       if self.op.shutdown and instance.status == "up":
4457         if not rpc.call_instance_start(src_node, instance, None):
4458           _ShutdownInstanceDisks(instance, self.cfg)
4459           raise errors.OpExecError("Could not start instance")
4460
4461     # TODO: check for size
4462
4463     for dev in snap_disks:
4464       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4465                                            instance):
4466         logger.Error("could not export block device %s from node"
4467                      " %s to node %s" %
4468                      (dev.logical_id[1], src_node, dst_node.name))
4469       if not rpc.call_blockdev_remove(src_node, dev):
4470         logger.Error("could not remove snapshot block device %s from"
4471                      " node %s" % (dev.logical_id[1], src_node))
4472
4473     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4474       logger.Error("could not finalize export for instance %s on node %s" %
4475                    (instance.name, dst_node.name))
4476
4477     nodelist = self.cfg.GetNodeList()
4478     nodelist.remove(dst_node.name)
4479
4480     # on one-node clusters nodelist will be empty after the removal
4481     # if we proceed the backup would be removed because OpQueryExports
4482     # substitutes an empty list with the full cluster node list.
4483     if nodelist:
4484       op = opcodes.OpQueryExports(nodes=nodelist)
4485       exportlist = self.proc.ChainOpCode(op)
4486       for node in exportlist:
4487         if instance.name in exportlist[node]:
4488           if not rpc.call_export_remove(node, instance.name):
4489             logger.Error("could not remove older export for instance %s"
4490                          " on node %s" % (instance.name, node))
4491
4492
4493 class TagsLU(NoHooksLU):
4494   """Generic tags LU.
4495
4496   This is an abstract class which is the parent of all the other tags LUs.
4497
4498   """
4499   def CheckPrereq(self):
4500     """Check prerequisites.
4501
4502     """
4503     if self.op.kind == constants.TAG_CLUSTER:
4504       self.target = self.cfg.GetClusterInfo()
4505     elif self.op.kind == constants.TAG_NODE:
4506       name = self.cfg.ExpandNodeName(self.op.name)
4507       if name is None:
4508         raise errors.OpPrereqError("Invalid node name (%s)" %
4509                                    (self.op.name,))
4510       self.op.name = name
4511       self.target = self.cfg.GetNodeInfo(name)
4512     elif self.op.kind == constants.TAG_INSTANCE:
4513       name = self.cfg.ExpandInstanceName(self.op.name)
4514       if name is None:
4515         raise errors.OpPrereqError("Invalid instance name (%s)" %
4516                                    (self.op.name,))
4517       self.op.name = name
4518       self.target = self.cfg.GetInstanceInfo(name)
4519     else:
4520       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4521                                  str(self.op.kind))
4522
4523
4524 class LUGetTags(TagsLU):
4525   """Returns the tags of a given object.
4526
4527   """
4528   _OP_REQP = ["kind", "name"]
4529
4530   def Exec(self, feedback_fn):
4531     """Returns the tag list.
4532
4533     """
4534     return self.target.GetTags()
4535
4536
4537 class LUSearchTags(NoHooksLU):
4538   """Searches the tags for a given pattern.
4539
4540   """
4541   _OP_REQP = ["pattern"]
4542
4543   def CheckPrereq(self):
4544     """Check prerequisites.
4545
4546     This checks the pattern passed for validity by compiling it.
4547
4548     """
4549     try:
4550       self.re = re.compile(self.op.pattern)
4551     except re.error, err:
4552       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4553                                  (self.op.pattern, err))
4554
4555   def Exec(self, feedback_fn):
4556     """Returns the tag list.
4557
4558     """
4559     cfg = self.cfg
4560     tgts = [("/cluster", cfg.GetClusterInfo())]
4561     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4562     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4563     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4564     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4565     results = []
4566     for path, target in tgts:
4567       for tag in target.GetTags():
4568         if self.re.search(tag):
4569           results.append((path, tag))
4570     return results
4571
4572
4573 class LUAddTags(TagsLU):
4574   """Sets a tag on a given object.
4575
4576   """
4577   _OP_REQP = ["kind", "name", "tags"]
4578
4579   def CheckPrereq(self):
4580     """Check prerequisites.
4581
4582     This checks the type and length of the tag name and value.
4583
4584     """
4585     TagsLU.CheckPrereq(self)
4586     for tag in self.op.tags:
4587       objects.TaggableObject.ValidateTag(tag)
4588
4589   def Exec(self, feedback_fn):
4590     """Sets the tag.
4591
4592     """
4593     try:
4594       for tag in self.op.tags:
4595         self.target.AddTag(tag)
4596     except errors.TagError, err:
4597       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4598     try:
4599       self.cfg.Update(self.target)
4600     except errors.ConfigurationError:
4601       raise errors.OpRetryError("There has been a modification to the"
4602                                 " config file and the operation has been"
4603                                 " aborted. Please retry.")
4604
4605
4606 class LUDelTags(TagsLU):
4607   """Delete a list of tags from a given object.
4608
4609   """
4610   _OP_REQP = ["kind", "name", "tags"]
4611
4612   def CheckPrereq(self):
4613     """Check prerequisites.
4614
4615     This checks that we have the given tag.
4616
4617     """
4618     TagsLU.CheckPrereq(self)
4619     for tag in self.op.tags:
4620       objects.TaggableObject.ValidateTag(tag)
4621     del_tags = frozenset(self.op.tags)
4622     cur_tags = self.target.GetTags()
4623     if not del_tags <= cur_tags:
4624       diff_tags = del_tags - cur_tags
4625       diff_names = ["'%s'" % tag for tag in diff_tags]
4626       diff_names.sort()
4627       raise errors.OpPrereqError("Tag(s) %s not found" %
4628                                  (",".join(diff_names)))
4629
4630   def Exec(self, feedback_fn):
4631     """Remove the tag from the object.
4632
4633     """
4634     for tag in self.op.tags:
4635       self.target.RemoveTag(tag)
4636     try:
4637       self.cfg.Update(self.target)
4638     except errors.ConfigurationError:
4639       raise errors.OpRetryError("There has been a modification to the"
4640                                 " config file and the operation has been"
4641                                 " aborted. Please retry.")
4642
4643 class LUTestDelay(NoHooksLU):
4644   """Sleep for a specified amount of time.
4645
4646   This LU sleeps on the master and/or nodes for a specified amoutn of
4647   time.
4648
4649   """
4650   _OP_REQP = ["duration", "on_master", "on_nodes"]
4651
4652   def CheckPrereq(self):
4653     """Check prerequisites.
4654
4655     This checks that we have a good list of nodes and/or the duration
4656     is valid.
4657
4658     """
4659
4660     if self.op.on_nodes:
4661       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4662
4663   def Exec(self, feedback_fn):
4664     """Do the actual sleep.
4665
4666     """
4667     if self.op.on_master:
4668       if not utils.TestDelay(self.op.duration):
4669         raise errors.OpExecError("Error during master delay test")
4670     if self.op.on_nodes:
4671       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4672       if not result:
4673         raise errors.OpExecError("Complete failure from rpc call")
4674       for node, node_result in result.items():
4675         if not node_result:
4676           raise errors.OpExecError("Failure during rpc call to node %s,"
4677                                    " result: %s" % (node, node_result))
4678
4679
4680 def _AllocatorGetClusterData(cfg, sstore):
4681   """Compute the generic allocator input data.
4682
4683   This is the data that is independent of the actual operation.
4684
4685   """
4686   # cluster data
4687   data = {
4688     "version": 1,
4689     "cluster_name": sstore.GetClusterName(),
4690     "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4691     # we don't have job IDs
4692     }
4693
4694   # node data
4695   node_results = {}
4696   node_list = cfg.GetNodeList()
4697   node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4698   for nname in node_list:
4699     ninfo = cfg.GetNodeInfo(nname)
4700     if nname not in node_data or not isinstance(node_data[nname], dict):
4701       raise errors.OpExecError("Can't get data for node %s" % nname)
4702     remote_info = node_data[nname]
4703     for attr in ['memory_total', 'memory_free',
4704                  'vg_size', 'vg_free']:
4705       if attr not in remote_info:
4706         raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4707                                  (nname, attr))
4708       try:
4709         int(remote_info[attr])
4710       except ValueError, err:
4711         raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4712                                  " %s" % (nname, attr, str(err)))
4713     pnr = {
4714       "tags": list(ninfo.GetTags()),
4715       "total_memory": utils.TryConvert(int, remote_info['memory_total']),
4716       "free_memory": utils.TryConvert(int, remote_info['memory_free']),
4717       "total_disk": utils.TryConvert(int, remote_info['vg_size']),
4718       "free_disk": utils.TryConvert(int, remote_info['vg_free']),
4719       "primary_ip": ninfo.primary_ip,
4720       "secondary_ip": ninfo.secondary_ip,
4721       }
4722     node_results[nname] = pnr
4723   data["nodes"] = node_results
4724
4725   # instance data
4726   instance_data = {}
4727   i_list = cfg.GetInstanceList()
4728   for iname in i_list:
4729     iinfo = cfg.GetInstanceInfo(iname)
4730     nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4731                 for n in iinfo.nics]
4732     pir = {
4733       "tags": list(iinfo.GetTags()),
4734       "should_run": iinfo.status == "up",
4735       "vcpus": iinfo.vcpus,
4736       "memory": iinfo.memory,
4737       "os": iinfo.os,
4738       "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4739       "nics": nic_data,
4740       "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4741       "disk_template": iinfo.disk_template,
4742       }
4743     instance_data[iname] = pir
4744
4745   data["instances"] = instance_data
4746
4747   return data
4748
4749
4750 def _AllocatorAddNewInstance(data, op):
4751   """Add new instance data to allocator structure.
4752
4753   This in combination with _AllocatorGetClusterData will create the
4754   correct structure needed as input for the allocator.
4755
4756   The checks for the completeness of the opcode must have already been
4757   done.
4758
4759   """
4760   request = {
4761     "type": "allocate",
4762     "name": op.name,
4763     "disk_template": op.disk_template,
4764     "tags": op.tags,
4765     "os": op.os,
4766     "vcpus": op.vcpus,
4767     "memory": op.mem_size,
4768     "disks": op.disks,
4769     "nics": op.nics,
4770     }
4771   data["request"] = request
4772
4773
4774 def _AllocatorAddRelocateInstance(data, op):
4775   """Add relocate instance data to allocator structure.
4776
4777   This in combination with _AllocatorGetClusterData will create the
4778   correct structure needed as input for the allocator.
4779
4780   The checks for the completeness of the opcode must have already been
4781   done.
4782
4783   """
4784   request = {
4785     "type": "replace_secondary",
4786     "name": op.name,
4787     }
4788   data["request"] = request
4789
4790
4791 class LUTestAllocator(NoHooksLU):
4792   """Run allocator tests.
4793
4794   This LU runs the allocator tests
4795
4796   """
4797   _OP_REQP = ["direction", "mode", "name"]
4798
4799   def CheckPrereq(self):
4800     """Check prerequisites.
4801
4802     This checks the opcode parameters depending on the director and mode test.
4803
4804     """
4805     if self.op.mode == constants.ALF_MODE_ALLOC:
4806       for attr in ["name", "mem_size", "disks", "disk_template",
4807                    "os", "tags", "nics", "vcpus"]:
4808         if not hasattr(self.op, attr):
4809           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4810                                      attr)
4811       iname = self.cfg.ExpandInstanceName(self.op.name)
4812       if iname is not None:
4813         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4814                                    iname)
4815       if not isinstance(self.op.nics, list):
4816         raise errors.OpPrereqError("Invalid parameter 'nics'")
4817       for row in self.op.nics:
4818         if (not isinstance(row, dict) or
4819             "mac" not in row or
4820             "ip" not in row or
4821             "bridge" not in row):
4822           raise errors.OpPrereqError("Invalid contents of the"
4823                                      " 'nics' parameter")
4824       if not isinstance(self.op.disks, list):
4825         raise errors.OpPrereqError("Invalid parameter 'disks'")
4826       for row in self.op.disks:
4827         if (not isinstance(row, dict) or
4828             "size" not in row or
4829             not isinstance(row["size"], int) or
4830             "mode" not in row or
4831             row["mode"] not in ['r', 'w']):
4832           raise errors.OpPrereqError("Invalid contents of the"
4833                                      " 'disks' parameter")
4834     elif self.op.mode == constants.ALF_MODE_RELOC:
4835       if not hasattr(self.op, "name"):
4836         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4837       fname = self.cfg.ExpandInstanceName(self.op.name)
4838       if fname is None:
4839         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4840                                    self.op.name)
4841       self.op.name = fname
4842     else:
4843       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4844                                  self.op.mode)
4845
4846     if self.op.direction == constants.ALF_DIR_OUT:
4847       if not hasattr(self.op, "allocator"):
4848         raise errors.OpPrereqError("Missing allocator name")
4849       raise errors.OpPrereqError("Allocator out mode not supported yet")
4850     elif self.op.direction != constants.ALF_DIR_IN:
4851       raise errors.OpPrereqError("Wrong allocator test '%s'" %
4852                                  self.op.direction)
4853
4854   def Exec(self, feedback_fn):
4855     """Run the allocator test.
4856
4857     """
4858     data = _AllocatorGetClusterData(self.cfg, self.sstore)
4859     if self.op.mode == constants.ALF_MODE_ALLOC:
4860       _AllocatorAddNewInstance(data, self.op)
4861     else:
4862       _AllocatorAddRelocateInstance(data, self.op)
4863
4864     if _JSON_INDENT is None:
4865       text = simplejson.dumps(data)
4866     else:
4867       text = simplejson.dumps(data, indent=_JSON_INDENT)
4868     return text