Codestyle fixes: adding a few empty lines
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement CheckPrereq which also fills in the opcode instance
52       with all the fields (even if as None)
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements (REQ_CLUSTER,
57       REQ_MASTER); note that all commands require root permissions
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_CLUSTER = True
64   REQ_MASTER = True
65
66   def __init__(self, processor, op, cfg, sstore):
67     """Constructor for LogicalUnit.
68
69     This needs to be overriden in derived classes in order to check op
70     validity.
71
72     """
73     self.proc = processor
74     self.op = op
75     self.cfg = cfg
76     self.sstore = sstore
77     for attr_name in self._OP_REQP:
78       attr_val = getattr(op, attr_name, None)
79       if attr_val is None:
80         raise errors.OpPrereqError("Required parameter '%s' missing" %
81                                    attr_name)
82     if self.REQ_CLUSTER:
83       if not cfg.IsCluster():
84         raise errors.OpPrereqError("Cluster not initialized yet,"
85                                    " use 'gnt-cluster init' first.")
86       if self.REQ_MASTER:
87         master = sstore.GetMasterNode()
88         if master != utils.HostInfo().name:
89           raise errors.OpPrereqError("Commands must be run on the master"
90                                      " node %s" % master)
91
92   def CheckPrereq(self):
93     """Check prerequisites for this LU.
94
95     This method should check that the prerequisites for the execution
96     of this LU are fulfilled. It can do internode communication, but
97     it should be idempotent - no cluster or system changes are
98     allowed.
99
100     The method should raise errors.OpPrereqError in case something is
101     not fulfilled. Its return value is ignored.
102
103     This method should also update all the parameters of the opcode to
104     their canonical form; e.g. a short node name must be fully
105     expanded after this method has successfully completed (so that
106     hooks, logging, etc. work correctly).
107
108     """
109     raise NotImplementedError
110
111   def Exec(self, feedback_fn):
112     """Execute the LU.
113
114     This method should implement the actual work. It should raise
115     errors.OpExecError for failures that are somewhat dealt with in
116     code, or expected.
117
118     """
119     raise NotImplementedError
120
121   def BuildHooksEnv(self):
122     """Build hooks environment for this LU.
123
124     This method should return a three-node tuple consisting of: a dict
125     containing the environment that will be used for running the
126     specific hook for this LU, a list of node names on which the hook
127     should run before the execution, and a list of node names on which
128     the hook should run after the execution.
129
130     The keys of the dict must not have 'GANETI_' prefixed as this will
131     be handled in the hooks runner. Also note additional keys will be
132     added by the hooks runner. If the LU doesn't define any
133     environment, an empty dict (and not None) should be returned.
134
135     As for the node lists, the master should not be included in the
136     them, as it will be added by the hooks runner in case this LU
137     requires a cluster to run on (otherwise we don't have a node
138     list). No nodes should be returned as an empty list (and not
139     None).
140
141     Note that if the HPATH for a LU class is None, this function will
142     not be called.
143
144     """
145     raise NotImplementedError
146
147
148 class NoHooksLU(LogicalUnit):
149   """Simple LU which runs no hooks.
150
151   This LU is intended as a parent for other LogicalUnits which will
152   run no hooks, in order to reduce duplicate code.
153
154   """
155   HPATH = None
156   HTYPE = None
157
158   def BuildHooksEnv(self):
159     """Build hooks env.
160
161     This is a no-op, since we don't run hooks.
162
163     """
164     return {}, [], []
165
166
167 def _AddHostToEtcHosts(hostname):
168   """Wrapper around utils.SetEtcHostsEntry.
169
170   """
171   hi = utils.HostInfo(name=hostname)
172   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
173
174
175 def _RemoveHostFromEtcHosts(hostname):
176   """Wrapper around utils.RemoveEtcHostsEntry.
177
178   """
179   hi = utils.HostInfo(name=hostname)
180   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
181   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
182
183
184 def _GetWantedNodes(lu, nodes):
185   """Returns list of checked and expanded node names.
186
187   Args:
188     nodes: List of nodes (strings) or None for all
189
190   """
191   if not isinstance(nodes, list):
192     raise errors.OpPrereqError("Invalid argument type 'nodes'")
193
194   if nodes:
195     wanted = []
196
197     for name in nodes:
198       node = lu.cfg.ExpandNodeName(name)
199       if node is None:
200         raise errors.OpPrereqError("No such node name '%s'" % name)
201       wanted.append(node)
202
203   else:
204     wanted = lu.cfg.GetNodeList()
205   return utils.NiceSort(wanted)
206
207
208 def _GetWantedInstances(lu, instances):
209   """Returns list of checked and expanded instance names.
210
211   Args:
212     instances: List of instances (strings) or None for all
213
214   """
215   if not isinstance(instances, list):
216     raise errors.OpPrereqError("Invalid argument type 'instances'")
217
218   if instances:
219     wanted = []
220
221     for name in instances:
222       instance = lu.cfg.ExpandInstanceName(name)
223       if instance is None:
224         raise errors.OpPrereqError("No such instance name '%s'" % name)
225       wanted.append(instance)
226
227   else:
228     wanted = lu.cfg.GetInstanceList()
229   return utils.NiceSort(wanted)
230
231
232 def _CheckOutputFields(static, dynamic, selected):
233   """Checks whether all selected fields are valid.
234
235   Args:
236     static: Static fields
237     dynamic: Dynamic fields
238
239   """
240   static_fields = frozenset(static)
241   dynamic_fields = frozenset(dynamic)
242
243   all_fields = static_fields | dynamic_fields
244
245   if not all_fields.issuperset(selected):
246     raise errors.OpPrereqError("Unknown output fields selected: %s"
247                                % ",".join(frozenset(selected).
248                                           difference(all_fields)))
249
250
251 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
252                           memory, vcpus, nics):
253   """Builds instance related env variables for hooks from single variables.
254
255   Args:
256     secondary_nodes: List of secondary nodes as strings
257   """
258   env = {
259     "OP_TARGET": name,
260     "INSTANCE_NAME": name,
261     "INSTANCE_PRIMARY": primary_node,
262     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
263     "INSTANCE_OS_TYPE": os_type,
264     "INSTANCE_STATUS": status,
265     "INSTANCE_MEMORY": memory,
266     "INSTANCE_VCPUS": vcpus,
267   }
268
269   if nics:
270     nic_count = len(nics)
271     for idx, (ip, bridge, mac) in enumerate(nics):
272       if ip is None:
273         ip = ""
274       env["INSTANCE_NIC%d_IP" % idx] = ip
275       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
276       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
277   else:
278     nic_count = 0
279
280   env["INSTANCE_NIC_COUNT"] = nic_count
281
282   return env
283
284
285 def _BuildInstanceHookEnvByObject(instance, override=None):
286   """Builds instance related env variables for hooks from an object.
287
288   Args:
289     instance: objects.Instance object of instance
290     override: dict of values to override
291   """
292   args = {
293     'name': instance.name,
294     'primary_node': instance.primary_node,
295     'secondary_nodes': instance.secondary_nodes,
296     'os_type': instance.os,
297     'status': instance.os,
298     'memory': instance.memory,
299     'vcpus': instance.vcpus,
300     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
301   }
302   if override:
303     args.update(override)
304   return _BuildInstanceHookEnv(**args)
305
306
307 def _UpdateKnownHosts(fullnode, ip, pubkey):
308   """Ensure a node has a correct known_hosts entry.
309
310   Args:
311     fullnode - Fully qualified domain name of host. (str)
312     ip       - IPv4 address of host (str)
313     pubkey   - the public key of the cluster
314
315   """
316   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
317     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
318   else:
319     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
320
321   inthere = False
322
323   save_lines = []
324   add_lines = []
325   removed = False
326
327   for rawline in f:
328     logger.Debug('read %s' % (repr(rawline),))
329
330     parts = rawline.rstrip('\r\n').split()
331
332     # Ignore unwanted lines
333     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
334       fields = parts[0].split(',')
335       key = parts[2]
336
337       haveall = True
338       havesome = False
339       for spec in [ ip, fullnode ]:
340         if spec not in fields:
341           haveall = False
342         if spec in fields:
343           havesome = True
344
345       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
346       if haveall and key == pubkey:
347         inthere = True
348         save_lines.append(rawline)
349         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
350         continue
351
352       if havesome and (not haveall or key != pubkey):
353         removed = True
354         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
355         continue
356
357     save_lines.append(rawline)
358
359   if not inthere:
360     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
361     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
362
363   if removed:
364     save_lines = save_lines + add_lines
365
366     # Write a new file and replace old.
367     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
368                                    constants.DATA_DIR)
369     newfile = os.fdopen(fd, 'w')
370     try:
371       newfile.write(''.join(save_lines))
372     finally:
373       newfile.close()
374     logger.Debug("Wrote new known_hosts.")
375     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
376
377   elif add_lines:
378     # Simply appending a new line will do the trick.
379     f.seek(0, 2)
380     for add in add_lines:
381       f.write(add)
382
383   f.close()
384
385
386 def _HasValidVG(vglist, vgname):
387   """Checks if the volume group list is valid.
388
389   A non-None return value means there's an error, and the return value
390   is the error message.
391
392   """
393   vgsize = vglist.get(vgname, None)
394   if vgsize is None:
395     return "volume group '%s' missing" % vgname
396   elif vgsize < 20480:
397     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
398             (vgname, vgsize))
399   return None
400
401
402 def _InitSSHSetup(node):
403   """Setup the SSH configuration for the cluster.
404
405
406   This generates a dsa keypair for root, adds the pub key to the
407   permitted hosts and adds the hostkey to its own known hosts.
408
409   Args:
410     node: the name of this host as a fqdn
411
412   """
413   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
414
415   for name in priv_key, pub_key:
416     if os.path.exists(name):
417       utils.CreateBackup(name)
418     utils.RemoveFile(name)
419
420   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
421                          "-f", priv_key,
422                          "-q", "-N", ""])
423   if result.failed:
424     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
425                              result.output)
426
427   f = open(pub_key, 'r')
428   try:
429     utils.AddAuthorizedKey(auth_keys, f.read(8192))
430   finally:
431     f.close()
432
433
434 def _InitGanetiServerSetup(ss):
435   """Setup the necessary configuration for the initial node daemon.
436
437   This creates the nodepass file containing the shared password for
438   the cluster and also generates the SSL certificate.
439
440   """
441   # Create pseudo random password
442   randpass = sha.new(os.urandom(64)).hexdigest()
443   # and write it into sstore
444   ss.SetKey(ss.SS_NODED_PASS, randpass)
445
446   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
447                          "-days", str(365*5), "-nodes", "-x509",
448                          "-keyout", constants.SSL_CERT_FILE,
449                          "-out", constants.SSL_CERT_FILE, "-batch"])
450   if result.failed:
451     raise errors.OpExecError("could not generate server ssl cert, command"
452                              " %s had exitcode %s and error message %s" %
453                              (result.cmd, result.exit_code, result.output))
454
455   os.chmod(constants.SSL_CERT_FILE, 0400)
456
457   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
458
459   if result.failed:
460     raise errors.OpExecError("Could not start the node daemon, command %s"
461                              " had exitcode %s and error %s" %
462                              (result.cmd, result.exit_code, result.output))
463
464
465 def _CheckInstanceBridgesExist(instance):
466   """Check that the brigdes needed by an instance exist.
467
468   """
469   # check bridges existance
470   brlist = [nic.bridge for nic in instance.nics]
471   if not rpc.call_bridges_exist(instance.primary_node, brlist):
472     raise errors.OpPrereqError("one or more target bridges %s does not"
473                                " exist on destination node '%s'" %
474                                (brlist, instance.primary_node))
475
476
477 class LUInitCluster(LogicalUnit):
478   """Initialise the cluster.
479
480   """
481   HPATH = "cluster-init"
482   HTYPE = constants.HTYPE_CLUSTER
483   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
484               "def_bridge", "master_netdev"]
485   REQ_CLUSTER = False
486
487   def BuildHooksEnv(self):
488     """Build hooks env.
489
490     Notes: Since we don't require a cluster, we must manually add
491     ourselves in the post-run node list.
492
493     """
494     env = {"OP_TARGET": self.op.cluster_name}
495     return env, [], [self.hostname.name]
496
497   def CheckPrereq(self):
498     """Verify that the passed name is a valid one.
499
500     """
501     if config.ConfigWriter.IsCluster():
502       raise errors.OpPrereqError("Cluster is already initialised")
503
504     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
505       if not os.path.exists(constants.VNC_PASSWORD_FILE):
506         raise errors.OpPrereqError("Please prepare the cluster VNC"
507                                    "password file %s" %
508                                    constants.VNC_PASSWORD_FILE)
509
510     self.hostname = hostname = utils.HostInfo()
511
512     if hostname.ip.startswith("127."):
513       raise errors.OpPrereqError("This host's IP resolves to the private"
514                                  " range (%s). Please fix DNS or %s." %
515                                  (hostname.ip, constants.ETC_HOSTS))
516
517     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
518
519     if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
520                          constants.DEFAULT_NODED_PORT):
521       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
522                                  " to %s,\nbut this ip address does not"
523                                  " belong to this host."
524                                  " Aborting." % hostname.ip)
525
526     secondary_ip = getattr(self.op, "secondary_ip", None)
527     if secondary_ip and not utils.IsValidIP(secondary_ip):
528       raise errors.OpPrereqError("Invalid secondary ip given")
529     if (secondary_ip and
530         secondary_ip != hostname.ip and
531         (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
532                            constants.DEFAULT_NODED_PORT))):
533       raise errors.OpPrereqError("You gave %s as secondary IP,"
534                                  " but it does not belong to this host." %
535                                  secondary_ip)
536     self.secondary_ip = secondary_ip
537
538     # checks presence of the volume group given
539     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
540
541     if vgstatus:
542       raise errors.OpPrereqError("Error: %s" % vgstatus)
543
544     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
545                     self.op.mac_prefix):
546       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
547                                  self.op.mac_prefix)
548
549     if self.op.hypervisor_type not in constants.HYPER_TYPES:
550       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
551                                  self.op.hypervisor_type)
552
553     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
554     if result.failed:
555       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
556                                  (self.op.master_netdev,
557                                   result.output.strip()))
558
559     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
560             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
561       raise errors.OpPrereqError("Init.d script '%s' missing or not"
562                                  " executable." % constants.NODE_INITD_SCRIPT)
563
564   def Exec(self, feedback_fn):
565     """Initialize the cluster.
566
567     """
568     clustername = self.clustername
569     hostname = self.hostname
570
571     # set up the simple store
572     self.sstore = ss = ssconf.SimpleStore()
573     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
574     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
575     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
576     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
577     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
578
579     # set up the inter-node password and certificate
580     _InitGanetiServerSetup(ss)
581
582     # start the master ip
583     rpc.call_node_start_master(hostname.name)
584
585     # set up ssh config and /etc/hosts
586     f = open(constants.SSH_HOST_RSA_PUB, 'r')
587     try:
588       sshline = f.read()
589     finally:
590       f.close()
591     sshkey = sshline.split(" ")[1]
592
593     _AddHostToEtcHosts(hostname.name)
594
595     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
596
597     _InitSSHSetup(hostname.name)
598
599     # init of cluster config file
600     self.cfg = cfgw = config.ConfigWriter()
601     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
602                     sshkey, self.op.mac_prefix,
603                     self.op.vg_name, self.op.def_bridge)
604
605
606 class LUDestroyCluster(NoHooksLU):
607   """Logical unit for destroying the cluster.
608
609   """
610   _OP_REQP = []
611
612   def CheckPrereq(self):
613     """Check prerequisites.
614
615     This checks whether the cluster is empty.
616
617     Any errors are signalled by raising errors.OpPrereqError.
618
619     """
620     master = self.sstore.GetMasterNode()
621
622     nodelist = self.cfg.GetNodeList()
623     if len(nodelist) != 1 or nodelist[0] != master:
624       raise errors.OpPrereqError("There are still %d node(s) in"
625                                  " this cluster." % (len(nodelist) - 1))
626     instancelist = self.cfg.GetInstanceList()
627     if instancelist:
628       raise errors.OpPrereqError("There are still %d instance(s) in"
629                                  " this cluster." % len(instancelist))
630
631   def Exec(self, feedback_fn):
632     """Destroys the cluster.
633
634     """
635     master = self.sstore.GetMasterNode()
636     if not rpc.call_node_stop_master(master):
637       raise errors.OpExecError("Could not disable the master role")
638     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
639     utils.CreateBackup(priv_key)
640     utils.CreateBackup(pub_key)
641     rpc.call_node_leave_cluster(master)
642
643
644 class LUVerifyCluster(NoHooksLU):
645   """Verifies the cluster status.
646
647   """
648   _OP_REQP = []
649
650   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
651                   remote_version, feedback_fn):
652     """Run multiple tests against a node.
653
654     Test list:
655       - compares ganeti version
656       - checks vg existance and size > 20G
657       - checks config file checksum
658       - checks ssh to other nodes
659
660     Args:
661       node: name of the node to check
662       file_list: required list of files
663       local_cksum: dictionary of local files and their checksums
664
665     """
666     # compares ganeti version
667     local_version = constants.PROTOCOL_VERSION
668     if not remote_version:
669       feedback_fn(" - ERROR: connection to %s failed" % (node))
670       return True
671
672     if local_version != remote_version:
673       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
674                       (local_version, node, remote_version))
675       return True
676
677     # checks vg existance and size > 20G
678
679     bad = False
680     if not vglist:
681       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
682                       (node,))
683       bad = True
684     else:
685       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
686       if vgstatus:
687         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
688         bad = True
689
690     # checks config file checksum
691     # checks ssh to any
692
693     if 'filelist' not in node_result:
694       bad = True
695       feedback_fn("  - ERROR: node hasn't returned file checksum data")
696     else:
697       remote_cksum = node_result['filelist']
698       for file_name in file_list:
699         if file_name not in remote_cksum:
700           bad = True
701           feedback_fn("  - ERROR: file '%s' missing" % file_name)
702         elif remote_cksum[file_name] != local_cksum[file_name]:
703           bad = True
704           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
705
706     if 'nodelist' not in node_result:
707       bad = True
708       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
709     else:
710       if node_result['nodelist']:
711         bad = True
712         for node in node_result['nodelist']:
713           feedback_fn("  - ERROR: communication with node '%s': %s" %
714                           (node, node_result['nodelist'][node]))
715     hyp_result = node_result.get('hypervisor', None)
716     if hyp_result is not None:
717       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
718     return bad
719
720   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
721     """Verify an instance.
722
723     This function checks to see if the required block devices are
724     available on the instance's node.
725
726     """
727     bad = False
728
729     instancelist = self.cfg.GetInstanceList()
730     if not instance in instancelist:
731       feedback_fn("  - ERROR: instance %s not in instance list %s" %
732                       (instance, instancelist))
733       bad = True
734
735     instanceconfig = self.cfg.GetInstanceInfo(instance)
736     node_current = instanceconfig.primary_node
737
738     node_vol_should = {}
739     instanceconfig.MapLVsByNode(node_vol_should)
740
741     for node in node_vol_should:
742       for volume in node_vol_should[node]:
743         if node not in node_vol_is or volume not in node_vol_is[node]:
744           feedback_fn("  - ERROR: volume %s missing on node %s" %
745                           (volume, node))
746           bad = True
747
748     if not instanceconfig.status == 'down':
749       if not instance in node_instance[node_current]:
750         feedback_fn("  - ERROR: instance %s not running on node %s" %
751                         (instance, node_current))
752         bad = True
753
754     for node in node_instance:
755       if (not node == node_current):
756         if instance in node_instance[node]:
757           feedback_fn("  - ERROR: instance %s should not run on node %s" %
758                           (instance, node))
759           bad = True
760
761     return bad
762
763   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
764     """Verify if there are any unknown volumes in the cluster.
765
766     The .os, .swap and backup volumes are ignored. All other volumes are
767     reported as unknown.
768
769     """
770     bad = False
771
772     for node in node_vol_is:
773       for volume in node_vol_is[node]:
774         if node not in node_vol_should or volume not in node_vol_should[node]:
775           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
776                       (volume, node))
777           bad = True
778     return bad
779
780   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
781     """Verify the list of running instances.
782
783     This checks what instances are running but unknown to the cluster.
784
785     """
786     bad = False
787     for node in node_instance:
788       for runninginstance in node_instance[node]:
789         if runninginstance not in instancelist:
790           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
791                           (runninginstance, node))
792           bad = True
793     return bad
794
795   def CheckPrereq(self):
796     """Check prerequisites.
797
798     This has no prerequisites.
799
800     """
801     pass
802
803   def Exec(self, feedback_fn):
804     """Verify integrity of cluster, performing various test on nodes.
805
806     """
807     bad = False
808     feedback_fn("* Verifying global settings")
809     for msg in self.cfg.VerifyConfig():
810       feedback_fn("  - ERROR: %s" % msg)
811
812     vg_name = self.cfg.GetVGName()
813     nodelist = utils.NiceSort(self.cfg.GetNodeList())
814     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
815     node_volume = {}
816     node_instance = {}
817
818     # FIXME: verify OS list
819     # do local checksums
820     file_names = list(self.sstore.GetFileList())
821     file_names.append(constants.SSL_CERT_FILE)
822     file_names.append(constants.CLUSTER_CONF_FILE)
823     local_checksums = utils.FingerprintFiles(file_names)
824
825     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
826     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
827     all_instanceinfo = rpc.call_instance_list(nodelist)
828     all_vglist = rpc.call_vg_list(nodelist)
829     node_verify_param = {
830       'filelist': file_names,
831       'nodelist': nodelist,
832       'hypervisor': None,
833       }
834     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
835     all_rversion = rpc.call_version(nodelist)
836
837     for node in nodelist:
838       feedback_fn("* Verifying node %s" % node)
839       result = self._VerifyNode(node, file_names, local_checksums,
840                                 all_vglist[node], all_nvinfo[node],
841                                 all_rversion[node], feedback_fn)
842       bad = bad or result
843
844       # node_volume
845       volumeinfo = all_volumeinfo[node]
846
847       if isinstance(volumeinfo, basestring):
848         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
849                     (node, volumeinfo[-400:].encode('string_escape')))
850         bad = True
851         node_volume[node] = {}
852       elif not isinstance(volumeinfo, dict):
853         feedback_fn("  - ERROR: connection to %s failed" % (node,))
854         bad = True
855         continue
856       else:
857         node_volume[node] = volumeinfo
858
859       # node_instance
860       nodeinstance = all_instanceinfo[node]
861       if type(nodeinstance) != list:
862         feedback_fn("  - ERROR: connection to %s failed" % (node,))
863         bad = True
864         continue
865
866       node_instance[node] = nodeinstance
867
868     node_vol_should = {}
869
870     for instance in instancelist:
871       feedback_fn("* Verifying instance %s" % instance)
872       result =  self._VerifyInstance(instance, node_volume, node_instance,
873                                      feedback_fn)
874       bad = bad or result
875
876       inst_config = self.cfg.GetInstanceInfo(instance)
877
878       inst_config.MapLVsByNode(node_vol_should)
879
880     feedback_fn("* Verifying orphan volumes")
881     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
882                                        feedback_fn)
883     bad = bad or result
884
885     feedback_fn("* Verifying remaining instances")
886     result = self._VerifyOrphanInstances(instancelist, node_instance,
887                                          feedback_fn)
888     bad = bad or result
889
890     return int(bad)
891
892
893 class LUVerifyDisks(NoHooksLU):
894   """Verifies the cluster disks status.
895
896   """
897   _OP_REQP = []
898
899   def CheckPrereq(self):
900     """Check prerequisites.
901
902     This has no prerequisites.
903
904     """
905     pass
906
907   def Exec(self, feedback_fn):
908     """Verify integrity of cluster disks.
909
910     """
911     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
912
913     vg_name = self.cfg.GetVGName()
914     nodes = utils.NiceSort(self.cfg.GetNodeList())
915     instances = [self.cfg.GetInstanceInfo(name)
916                  for name in self.cfg.GetInstanceList()]
917
918     nv_dict = {}
919     for inst in instances:
920       inst_lvs = {}
921       if (inst.status != "up" or
922           inst.disk_template not in constants.DTS_NET_MIRROR):
923         continue
924       inst.MapLVsByNode(inst_lvs)
925       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
926       for node, vol_list in inst_lvs.iteritems():
927         for vol in vol_list:
928           nv_dict[(node, vol)] = inst
929
930     if not nv_dict:
931       return result
932
933     node_lvs = rpc.call_volume_list(nodes, vg_name)
934
935     to_act = set()
936     for node in nodes:
937       # node_volume
938       lvs = node_lvs[node]
939
940       if isinstance(lvs, basestring):
941         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
942         res_nlvm[node] = lvs
943       elif not isinstance(lvs, dict):
944         logger.Info("connection to node %s failed or invalid data returned" %
945                     (node,))
946         res_nodes.append(node)
947         continue
948
949       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
950         inst = nv_dict.pop((node, lv_name), None)
951         if (not lv_online and inst is not None
952             and inst.name not in res_instances):
953           res_instances.append(inst.name)
954
955     # any leftover items in nv_dict are missing LVs, let's arrange the
956     # data better
957     for key, inst in nv_dict.iteritems():
958       if inst.name not in res_missing:
959         res_missing[inst.name] = []
960       res_missing[inst.name].append(key)
961
962     return result
963
964
965 class LURenameCluster(LogicalUnit):
966   """Rename the cluster.
967
968   """
969   HPATH = "cluster-rename"
970   HTYPE = constants.HTYPE_CLUSTER
971   _OP_REQP = ["name"]
972
973   def BuildHooksEnv(self):
974     """Build hooks env.
975
976     """
977     env = {
978       "OP_TARGET": self.sstore.GetClusterName(),
979       "NEW_NAME": self.op.name,
980       }
981     mn = self.sstore.GetMasterNode()
982     return env, [mn], [mn]
983
984   def CheckPrereq(self):
985     """Verify that the passed name is a valid one.
986
987     """
988     hostname = utils.HostInfo(self.op.name)
989
990     new_name = hostname.name
991     self.ip = new_ip = hostname.ip
992     old_name = self.sstore.GetClusterName()
993     old_ip = self.sstore.GetMasterIP()
994     if new_name == old_name and new_ip == old_ip:
995       raise errors.OpPrereqError("Neither the name nor the IP address of the"
996                                  " cluster has changed")
997     if new_ip != old_ip:
998       result = utils.RunCmd(["fping", "-q", new_ip])
999       if not result.failed:
1000         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1001                                    " reachable on the network. Aborting." %
1002                                    new_ip)
1003
1004     self.op.name = new_name
1005
1006   def Exec(self, feedback_fn):
1007     """Rename the cluster.
1008
1009     """
1010     clustername = self.op.name
1011     ip = self.ip
1012     ss = self.sstore
1013
1014     # shutdown the master IP
1015     master = ss.GetMasterNode()
1016     if not rpc.call_node_stop_master(master):
1017       raise errors.OpExecError("Could not disable the master role")
1018
1019     try:
1020       # modify the sstore
1021       ss.SetKey(ss.SS_MASTER_IP, ip)
1022       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1023
1024       # Distribute updated ss config to all nodes
1025       myself = self.cfg.GetNodeInfo(master)
1026       dist_nodes = self.cfg.GetNodeList()
1027       if myself.name in dist_nodes:
1028         dist_nodes.remove(myself.name)
1029
1030       logger.Debug("Copying updated ssconf data to all nodes")
1031       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1032         fname = ss.KeyToFilename(keyname)
1033         result = rpc.call_upload_file(dist_nodes, fname)
1034         for to_node in dist_nodes:
1035           if not result[to_node]:
1036             logger.Error("copy of file %s to node %s failed" %
1037                          (fname, to_node))
1038     finally:
1039       if not rpc.call_node_start_master(master):
1040         logger.Error("Could not re-enable the master role on the master,"
1041                      " please restart manually.")
1042
1043
1044 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1045   """Sleep and poll for an instance's disk to sync.
1046
1047   """
1048   if not instance.disks:
1049     return True
1050
1051   if not oneshot:
1052     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1053
1054   node = instance.primary_node
1055
1056   for dev in instance.disks:
1057     cfgw.SetDiskID(dev, node)
1058
1059   retries = 0
1060   while True:
1061     max_time = 0
1062     done = True
1063     cumul_degraded = False
1064     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1065     if not rstats:
1066       proc.LogWarning("Can't get any data from node %s" % node)
1067       retries += 1
1068       if retries >= 10:
1069         raise errors.RemoteError("Can't contact node %s for mirror data,"
1070                                  " aborting." % node)
1071       time.sleep(6)
1072       continue
1073     retries = 0
1074     for i in range(len(rstats)):
1075       mstat = rstats[i]
1076       if mstat is None:
1077         proc.LogWarning("Can't compute data for node %s/%s" %
1078                         (node, instance.disks[i].iv_name))
1079         continue
1080       # we ignore the ldisk parameter
1081       perc_done, est_time, is_degraded, _ = mstat
1082       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1083       if perc_done is not None:
1084         done = False
1085         if est_time is not None:
1086           rem_time = "%d estimated seconds remaining" % est_time
1087           max_time = est_time
1088         else:
1089           rem_time = "no time estimate"
1090         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1091                      (instance.disks[i].iv_name, perc_done, rem_time))
1092     if done or oneshot:
1093       break
1094
1095     if unlock:
1096       utils.Unlock('cmd')
1097     try:
1098       time.sleep(min(60, max_time))
1099     finally:
1100       if unlock:
1101         utils.Lock('cmd')
1102
1103   if done:
1104     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1105   return not cumul_degraded
1106
1107
1108 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1109   """Check that mirrors are not degraded.
1110
1111   The ldisk parameter, if True, will change the test from the
1112   is_degraded attribute (which represents overall non-ok status for
1113   the device(s)) to the ldisk (representing the local storage status).
1114
1115   """
1116   cfgw.SetDiskID(dev, node)
1117   if ldisk:
1118     idx = 6
1119   else:
1120     idx = 5
1121
1122   result = True
1123   if on_primary or dev.AssembleOnSecondary():
1124     rstats = rpc.call_blockdev_find(node, dev)
1125     if not rstats:
1126       logger.ToStderr("Can't get any data from node %s" % node)
1127       result = False
1128     else:
1129       result = result and (not rstats[idx])
1130   if dev.children:
1131     for child in dev.children:
1132       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1133
1134   return result
1135
1136
1137 class LUDiagnoseOS(NoHooksLU):
1138   """Logical unit for OS diagnose/query.
1139
1140   """
1141   _OP_REQP = []
1142
1143   def CheckPrereq(self):
1144     """Check prerequisites.
1145
1146     This always succeeds, since this is a pure query LU.
1147
1148     """
1149     return
1150
1151   def Exec(self, feedback_fn):
1152     """Compute the list of OSes.
1153
1154     """
1155     node_list = self.cfg.GetNodeList()
1156     node_data = rpc.call_os_diagnose(node_list)
1157     if node_data == False:
1158       raise errors.OpExecError("Can't gather the list of OSes")
1159     return node_data
1160
1161
1162 class LURemoveNode(LogicalUnit):
1163   """Logical unit for removing a node.
1164
1165   """
1166   HPATH = "node-remove"
1167   HTYPE = constants.HTYPE_NODE
1168   _OP_REQP = ["node_name"]
1169
1170   def BuildHooksEnv(self):
1171     """Build hooks env.
1172
1173     This doesn't run on the target node in the pre phase as a failed
1174     node would not allows itself to run.
1175
1176     """
1177     env = {
1178       "OP_TARGET": self.op.node_name,
1179       "NODE_NAME": self.op.node_name,
1180       }
1181     all_nodes = self.cfg.GetNodeList()
1182     all_nodes.remove(self.op.node_name)
1183     return env, all_nodes, all_nodes
1184
1185   def CheckPrereq(self):
1186     """Check prerequisites.
1187
1188     This checks:
1189      - the node exists in the configuration
1190      - it does not have primary or secondary instances
1191      - it's not the master
1192
1193     Any errors are signalled by raising errors.OpPrereqError.
1194
1195     """
1196     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1197     if node is None:
1198       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1199
1200     instance_list = self.cfg.GetInstanceList()
1201
1202     masternode = self.sstore.GetMasterNode()
1203     if node.name == masternode:
1204       raise errors.OpPrereqError("Node is the master node,"
1205                                  " you need to failover first.")
1206
1207     for instance_name in instance_list:
1208       instance = self.cfg.GetInstanceInfo(instance_name)
1209       if node.name == instance.primary_node:
1210         raise errors.OpPrereqError("Instance %s still running on the node,"
1211                                    " please remove first." % instance_name)
1212       if node.name in instance.secondary_nodes:
1213         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1214                                    " please remove first." % instance_name)
1215     self.op.node_name = node.name
1216     self.node = node
1217
1218   def Exec(self, feedback_fn):
1219     """Removes the node from the cluster.
1220
1221     """
1222     node = self.node
1223     logger.Info("stopping the node daemon and removing configs from node %s" %
1224                 node.name)
1225
1226     rpc.call_node_leave_cluster(node.name)
1227
1228     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1229
1230     logger.Info("Removing node %s from config" % node.name)
1231
1232     self.cfg.RemoveNode(node.name)
1233
1234     _RemoveHostFromEtcHosts(node.name)
1235
1236
1237 class LUQueryNodes(NoHooksLU):
1238   """Logical unit for querying nodes.
1239
1240   """
1241   _OP_REQP = ["output_fields", "names"]
1242
1243   def CheckPrereq(self):
1244     """Check prerequisites.
1245
1246     This checks that the fields required are valid output fields.
1247
1248     """
1249     self.dynamic_fields = frozenset(["dtotal", "dfree",
1250                                      "mtotal", "mnode", "mfree",
1251                                      "bootid"])
1252
1253     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1254                                "pinst_list", "sinst_list",
1255                                "pip", "sip"],
1256                        dynamic=self.dynamic_fields,
1257                        selected=self.op.output_fields)
1258
1259     self.wanted = _GetWantedNodes(self, self.op.names)
1260
1261   def Exec(self, feedback_fn):
1262     """Computes the list of nodes and their attributes.
1263
1264     """
1265     nodenames = self.wanted
1266     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1267
1268     # begin data gathering
1269
1270     if self.dynamic_fields.intersection(self.op.output_fields):
1271       live_data = {}
1272       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1273       for name in nodenames:
1274         nodeinfo = node_data.get(name, None)
1275         if nodeinfo:
1276           live_data[name] = {
1277             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1278             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1279             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1280             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1281             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1282             "bootid": nodeinfo['bootid'],
1283             }
1284         else:
1285           live_data[name] = {}
1286     else:
1287       live_data = dict.fromkeys(nodenames, {})
1288
1289     node_to_primary = dict([(name, set()) for name in nodenames])
1290     node_to_secondary = dict([(name, set()) for name in nodenames])
1291
1292     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1293                              "sinst_cnt", "sinst_list"))
1294     if inst_fields & frozenset(self.op.output_fields):
1295       instancelist = self.cfg.GetInstanceList()
1296
1297       for instance_name in instancelist:
1298         inst = self.cfg.GetInstanceInfo(instance_name)
1299         if inst.primary_node in node_to_primary:
1300           node_to_primary[inst.primary_node].add(inst.name)
1301         for secnode in inst.secondary_nodes:
1302           if secnode in node_to_secondary:
1303             node_to_secondary[secnode].add(inst.name)
1304
1305     # end data gathering
1306
1307     output = []
1308     for node in nodelist:
1309       node_output = []
1310       for field in self.op.output_fields:
1311         if field == "name":
1312           val = node.name
1313         elif field == "pinst_list":
1314           val = list(node_to_primary[node.name])
1315         elif field == "sinst_list":
1316           val = list(node_to_secondary[node.name])
1317         elif field == "pinst_cnt":
1318           val = len(node_to_primary[node.name])
1319         elif field == "sinst_cnt":
1320           val = len(node_to_secondary[node.name])
1321         elif field == "pip":
1322           val = node.primary_ip
1323         elif field == "sip":
1324           val = node.secondary_ip
1325         elif field in self.dynamic_fields:
1326           val = live_data[node.name].get(field, None)
1327         else:
1328           raise errors.ParameterError(field)
1329         node_output.append(val)
1330       output.append(node_output)
1331
1332     return output
1333
1334
1335 class LUQueryNodeVolumes(NoHooksLU):
1336   """Logical unit for getting volumes on node(s).
1337
1338   """
1339   _OP_REQP = ["nodes", "output_fields"]
1340
1341   def CheckPrereq(self):
1342     """Check prerequisites.
1343
1344     This checks that the fields required are valid output fields.
1345
1346     """
1347     self.nodes = _GetWantedNodes(self, self.op.nodes)
1348
1349     _CheckOutputFields(static=["node"],
1350                        dynamic=["phys", "vg", "name", "size", "instance"],
1351                        selected=self.op.output_fields)
1352
1353
1354   def Exec(self, feedback_fn):
1355     """Computes the list of nodes and their attributes.
1356
1357     """
1358     nodenames = self.nodes
1359     volumes = rpc.call_node_volumes(nodenames)
1360
1361     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1362              in self.cfg.GetInstanceList()]
1363
1364     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1365
1366     output = []
1367     for node in nodenames:
1368       if node not in volumes or not volumes[node]:
1369         continue
1370
1371       node_vols = volumes[node][:]
1372       node_vols.sort(key=lambda vol: vol['dev'])
1373
1374       for vol in node_vols:
1375         node_output = []
1376         for field in self.op.output_fields:
1377           if field == "node":
1378             val = node
1379           elif field == "phys":
1380             val = vol['dev']
1381           elif field == "vg":
1382             val = vol['vg']
1383           elif field == "name":
1384             val = vol['name']
1385           elif field == "size":
1386             val = int(float(vol['size']))
1387           elif field == "instance":
1388             for inst in ilist:
1389               if node not in lv_by_node[inst]:
1390                 continue
1391               if vol['name'] in lv_by_node[inst][node]:
1392                 val = inst.name
1393                 break
1394             else:
1395               val = '-'
1396           else:
1397             raise errors.ParameterError(field)
1398           node_output.append(str(val))
1399
1400         output.append(node_output)
1401
1402     return output
1403
1404
1405 class LUAddNode(LogicalUnit):
1406   """Logical unit for adding node to the cluster.
1407
1408   """
1409   HPATH = "node-add"
1410   HTYPE = constants.HTYPE_NODE
1411   _OP_REQP = ["node_name"]
1412
1413   def BuildHooksEnv(self):
1414     """Build hooks env.
1415
1416     This will run on all nodes before, and on all nodes + the new node after.
1417
1418     """
1419     env = {
1420       "OP_TARGET": self.op.node_name,
1421       "NODE_NAME": self.op.node_name,
1422       "NODE_PIP": self.op.primary_ip,
1423       "NODE_SIP": self.op.secondary_ip,
1424       }
1425     nodes_0 = self.cfg.GetNodeList()
1426     nodes_1 = nodes_0 + [self.op.node_name, ]
1427     return env, nodes_0, nodes_1
1428
1429   def CheckPrereq(self):
1430     """Check prerequisites.
1431
1432     This checks:
1433      - the new node is not already in the config
1434      - it is resolvable
1435      - its parameters (single/dual homed) matches the cluster
1436
1437     Any errors are signalled by raising errors.OpPrereqError.
1438
1439     """
1440     node_name = self.op.node_name
1441     cfg = self.cfg
1442
1443     dns_data = utils.HostInfo(node_name)
1444
1445     node = dns_data.name
1446     primary_ip = self.op.primary_ip = dns_data.ip
1447     secondary_ip = getattr(self.op, "secondary_ip", None)
1448     if secondary_ip is None:
1449       secondary_ip = primary_ip
1450     if not utils.IsValidIP(secondary_ip):
1451       raise errors.OpPrereqError("Invalid secondary IP given")
1452     self.op.secondary_ip = secondary_ip
1453     node_list = cfg.GetNodeList()
1454     if node in node_list:
1455       raise errors.OpPrereqError("Node %s is already in the configuration"
1456                                  % node)
1457
1458     for existing_node_name in node_list:
1459       existing_node = cfg.GetNodeInfo(existing_node_name)
1460       if (existing_node.primary_ip == primary_ip or
1461           existing_node.secondary_ip == primary_ip or
1462           existing_node.primary_ip == secondary_ip or
1463           existing_node.secondary_ip == secondary_ip):
1464         raise errors.OpPrereqError("New node ip address(es) conflict with"
1465                                    " existing node %s" % existing_node.name)
1466
1467     # check that the type of the node (single versus dual homed) is the
1468     # same as for the master
1469     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1470     master_singlehomed = myself.secondary_ip == myself.primary_ip
1471     newbie_singlehomed = secondary_ip == primary_ip
1472     if master_singlehomed != newbie_singlehomed:
1473       if master_singlehomed:
1474         raise errors.OpPrereqError("The master has no private ip but the"
1475                                    " new node has one")
1476       else:
1477         raise errors.OpPrereqError("The master has a private ip but the"
1478                                    " new node doesn't have one")
1479
1480     # checks reachablity
1481     if not utils.TcpPing(utils.HostInfo().name,
1482                          primary_ip,
1483                          constants.DEFAULT_NODED_PORT):
1484       raise errors.OpPrereqError("Node not reachable by ping")
1485
1486     if not newbie_singlehomed:
1487       # check reachability from my secondary ip to newbie's secondary ip
1488       if not utils.TcpPing(myself.secondary_ip,
1489                            secondary_ip,
1490                            constants.DEFAULT_NODED_PORT):
1491         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1492                                    " based ping to noded port")
1493
1494     self.new_node = objects.Node(name=node,
1495                                  primary_ip=primary_ip,
1496                                  secondary_ip=secondary_ip)
1497
1498     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1499       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1500         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1501                                    constants.VNC_PASSWORD_FILE)
1502
1503   def Exec(self, feedback_fn):
1504     """Adds the new node to the cluster.
1505
1506     """
1507     new_node = self.new_node
1508     node = new_node.name
1509
1510     # set up inter-node password and certificate and restarts the node daemon
1511     gntpass = self.sstore.GetNodeDaemonPassword()
1512     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1513       raise errors.OpExecError("ganeti password corruption detected")
1514     f = open(constants.SSL_CERT_FILE)
1515     try:
1516       gntpem = f.read(8192)
1517     finally:
1518       f.close()
1519     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1520     # so we use this to detect an invalid certificate; as long as the
1521     # cert doesn't contain this, the here-document will be correctly
1522     # parsed by the shell sequence below
1523     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1524       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1525     if not gntpem.endswith("\n"):
1526       raise errors.OpExecError("PEM must end with newline")
1527     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1528
1529     # and then connect with ssh to set password and start ganeti-noded
1530     # note that all the below variables are sanitized at this point,
1531     # either by being constants or by the checks above
1532     ss = self.sstore
1533     mycommand = ("umask 077 && "
1534                  "echo '%s' > '%s' && "
1535                  "cat > '%s' << '!EOF.' && \n"
1536                  "%s!EOF.\n%s restart" %
1537                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1538                   constants.SSL_CERT_FILE, gntpem,
1539                   constants.NODE_INITD_SCRIPT))
1540
1541     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1542     if result.failed:
1543       raise errors.OpExecError("Remote command on node %s, error: %s,"
1544                                " output: %s" %
1545                                (node, result.fail_reason, result.output))
1546
1547     # check connectivity
1548     time.sleep(4)
1549
1550     result = rpc.call_version([node])[node]
1551     if result:
1552       if constants.PROTOCOL_VERSION == result:
1553         logger.Info("communication to node %s fine, sw version %s match" %
1554                     (node, result))
1555       else:
1556         raise errors.OpExecError("Version mismatch master version %s,"
1557                                  " node version %s" %
1558                                  (constants.PROTOCOL_VERSION, result))
1559     else:
1560       raise errors.OpExecError("Cannot get version from the new node")
1561
1562     # setup ssh on node
1563     logger.Info("copy ssh key to node %s" % node)
1564     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1565     keyarray = []
1566     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1567                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1568                 priv_key, pub_key]
1569
1570     for i in keyfiles:
1571       f = open(i, 'r')
1572       try:
1573         keyarray.append(f.read())
1574       finally:
1575         f.close()
1576
1577     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1578                                keyarray[3], keyarray[4], keyarray[5])
1579
1580     if not result:
1581       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1582
1583     # Add node to our /etc/hosts, and add key to known_hosts
1584     _AddHostToEtcHosts(new_node.name)
1585
1586     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1587                       self.cfg.GetHostKey())
1588
1589     if new_node.secondary_ip != new_node.primary_ip:
1590       if not rpc.call_node_tcp_ping(new_node.name,
1591                                     constants.LOCALHOST_IP_ADDRESS,
1592                                     new_node.secondary_ip,
1593                                     constants.DEFAULT_NODED_PORT,
1594                                     10, False):
1595         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1596                                  " you gave (%s). Please fix and re-run this"
1597                                  " command." % new_node.secondary_ip)
1598
1599     success, msg = ssh.VerifyNodeHostname(node)
1600     if not success:
1601       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1602                                " than the one the resolver gives: %s."
1603                                " Please fix and re-run this command." %
1604                                (node, msg))
1605
1606     # Distribute updated /etc/hosts and known_hosts to all nodes,
1607     # including the node just added
1608     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1609     dist_nodes = self.cfg.GetNodeList() + [node]
1610     if myself.name in dist_nodes:
1611       dist_nodes.remove(myself.name)
1612
1613     logger.Debug("Copying hosts and known_hosts to all nodes")
1614     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1615       result = rpc.call_upload_file(dist_nodes, fname)
1616       for to_node in dist_nodes:
1617         if not result[to_node]:
1618           logger.Error("copy of file %s to node %s failed" %
1619                        (fname, to_node))
1620
1621     to_copy = ss.GetFileList()
1622     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1623       to_copy.append(constants.VNC_PASSWORD_FILE)
1624     for fname in to_copy:
1625       if not ssh.CopyFileToNode(node, fname):
1626         logger.Error("could not copy file %s to node %s" % (fname, node))
1627
1628     logger.Info("adding node %s to cluster.conf" % node)
1629     self.cfg.AddNode(new_node)
1630
1631
1632 class LUMasterFailover(LogicalUnit):
1633   """Failover the master node to the current node.
1634
1635   This is a special LU in that it must run on a non-master node.
1636
1637   """
1638   HPATH = "master-failover"
1639   HTYPE = constants.HTYPE_CLUSTER
1640   REQ_MASTER = False
1641   _OP_REQP = []
1642
1643   def BuildHooksEnv(self):
1644     """Build hooks env.
1645
1646     This will run on the new master only in the pre phase, and on all
1647     the nodes in the post phase.
1648
1649     """
1650     env = {
1651       "OP_TARGET": self.new_master,
1652       "NEW_MASTER": self.new_master,
1653       "OLD_MASTER": self.old_master,
1654       }
1655     return env, [self.new_master], self.cfg.GetNodeList()
1656
1657   def CheckPrereq(self):
1658     """Check prerequisites.
1659
1660     This checks that we are not already the master.
1661
1662     """
1663     self.new_master = utils.HostInfo().name
1664     self.old_master = self.sstore.GetMasterNode()
1665
1666     if self.old_master == self.new_master:
1667       raise errors.OpPrereqError("This commands must be run on the node"
1668                                  " where you want the new master to be."
1669                                  " %s is already the master" %
1670                                  self.old_master)
1671
1672   def Exec(self, feedback_fn):
1673     """Failover the master node.
1674
1675     This command, when run on a non-master node, will cause the current
1676     master to cease being master, and the non-master to become new
1677     master.
1678
1679     """
1680     #TODO: do not rely on gethostname returning the FQDN
1681     logger.Info("setting master to %s, old master: %s" %
1682                 (self.new_master, self.old_master))
1683
1684     if not rpc.call_node_stop_master(self.old_master):
1685       logger.Error("could disable the master role on the old master"
1686                    " %s, please disable manually" % self.old_master)
1687
1688     ss = self.sstore
1689     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1690     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1691                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1692       logger.Error("could not distribute the new simple store master file"
1693                    " to the other nodes, please check.")
1694
1695     if not rpc.call_node_start_master(self.new_master):
1696       logger.Error("could not start the master role on the new master"
1697                    " %s, please check" % self.new_master)
1698       feedback_fn("Error in activating the master IP on the new master,"
1699                   " please fix manually.")
1700
1701
1702
1703 class LUQueryClusterInfo(NoHooksLU):
1704   """Query cluster configuration.
1705
1706   """
1707   _OP_REQP = []
1708   REQ_MASTER = False
1709
1710   def CheckPrereq(self):
1711     """No prerequsites needed for this LU.
1712
1713     """
1714     pass
1715
1716   def Exec(self, feedback_fn):
1717     """Return cluster config.
1718
1719     """
1720     result = {
1721       "name": self.sstore.GetClusterName(),
1722       "software_version": constants.RELEASE_VERSION,
1723       "protocol_version": constants.PROTOCOL_VERSION,
1724       "config_version": constants.CONFIG_VERSION,
1725       "os_api_version": constants.OS_API_VERSION,
1726       "export_version": constants.EXPORT_VERSION,
1727       "master": self.sstore.GetMasterNode(),
1728       "architecture": (platform.architecture()[0], platform.machine()),
1729       }
1730
1731     return result
1732
1733
1734 class LUClusterCopyFile(NoHooksLU):
1735   """Copy file to cluster.
1736
1737   """
1738   _OP_REQP = ["nodes", "filename"]
1739
1740   def CheckPrereq(self):
1741     """Check prerequisites.
1742
1743     It should check that the named file exists and that the given list
1744     of nodes is valid.
1745
1746     """
1747     if not os.path.exists(self.op.filename):
1748       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1749
1750     self.nodes = _GetWantedNodes(self, self.op.nodes)
1751
1752   def Exec(self, feedback_fn):
1753     """Copy a file from master to some nodes.
1754
1755     Args:
1756       opts - class with options as members
1757       args - list containing a single element, the file name
1758     Opts used:
1759       nodes - list containing the name of target nodes; if empty, all nodes
1760
1761     """
1762     filename = self.op.filename
1763
1764     myname = utils.HostInfo().name
1765
1766     for node in self.nodes:
1767       if node == myname:
1768         continue
1769       if not ssh.CopyFileToNode(node, filename):
1770         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1771
1772
1773 class LUDumpClusterConfig(NoHooksLU):
1774   """Return a text-representation of the cluster-config.
1775
1776   """
1777   _OP_REQP = []
1778
1779   def CheckPrereq(self):
1780     """No prerequisites.
1781
1782     """
1783     pass
1784
1785   def Exec(self, feedback_fn):
1786     """Dump a representation of the cluster config to the standard output.
1787
1788     """
1789     return self.cfg.DumpConfig()
1790
1791
1792 class LURunClusterCommand(NoHooksLU):
1793   """Run a command on some nodes.
1794
1795   """
1796   _OP_REQP = ["command", "nodes"]
1797
1798   def CheckPrereq(self):
1799     """Check prerequisites.
1800
1801     It checks that the given list of nodes is valid.
1802
1803     """
1804     self.nodes = _GetWantedNodes(self, self.op.nodes)
1805
1806   def Exec(self, feedback_fn):
1807     """Run a command on some nodes.
1808
1809     """
1810     data = []
1811     for node in self.nodes:
1812       result = ssh.SSHCall(node, "root", self.op.command)
1813       data.append((node, result.output, result.exit_code))
1814
1815     return data
1816
1817
1818 class LUActivateInstanceDisks(NoHooksLU):
1819   """Bring up an instance's disks.
1820
1821   """
1822   _OP_REQP = ["instance_name"]
1823
1824   def CheckPrereq(self):
1825     """Check prerequisites.
1826
1827     This checks that the instance is in the cluster.
1828
1829     """
1830     instance = self.cfg.GetInstanceInfo(
1831       self.cfg.ExpandInstanceName(self.op.instance_name))
1832     if instance is None:
1833       raise errors.OpPrereqError("Instance '%s' not known" %
1834                                  self.op.instance_name)
1835     self.instance = instance
1836
1837
1838   def Exec(self, feedback_fn):
1839     """Activate the disks.
1840
1841     """
1842     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1843     if not disks_ok:
1844       raise errors.OpExecError("Cannot activate block devices")
1845
1846     return disks_info
1847
1848
1849 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1850   """Prepare the block devices for an instance.
1851
1852   This sets up the block devices on all nodes.
1853
1854   Args:
1855     instance: a ganeti.objects.Instance object
1856     ignore_secondaries: if true, errors on secondary nodes won't result
1857                         in an error return from the function
1858
1859   Returns:
1860     false if the operation failed
1861     list of (host, instance_visible_name, node_visible_name) if the operation
1862          suceeded with the mapping from node devices to instance devices
1863   """
1864   device_info = []
1865   disks_ok = True
1866   iname = instance.name
1867   # With the two passes mechanism we try to reduce the window of
1868   # opportunity for the race condition of switching DRBD to primary
1869   # before handshaking occured, but we do not eliminate it
1870
1871   # The proper fix would be to wait (with some limits) until the
1872   # connection has been made and drbd transitions from WFConnection
1873   # into any other network-connected state (Connected, SyncTarget,
1874   # SyncSource, etc.)
1875
1876   # 1st pass, assemble on all nodes in secondary mode
1877   for inst_disk in instance.disks:
1878     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1879       cfg.SetDiskID(node_disk, node)
1880       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1881       if not result:
1882         logger.Error("could not prepare block device %s on node %s"
1883                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1884         if not ignore_secondaries:
1885           disks_ok = False
1886
1887   # FIXME: race condition on drbd migration to primary
1888
1889   # 2nd pass, do only the primary node
1890   for inst_disk in instance.disks:
1891     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1892       if node != instance.primary_node:
1893         continue
1894       cfg.SetDiskID(node_disk, node)
1895       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1896       if not result:
1897         logger.Error("could not prepare block device %s on node %s"
1898                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1899         disks_ok = False
1900     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1901
1902   # leave the disks configured for the primary node
1903   # this is a workaround that would be fixed better by
1904   # improving the logical/physical id handling
1905   for disk in instance.disks:
1906     cfg.SetDiskID(disk, instance.primary_node)
1907
1908   return disks_ok, device_info
1909
1910
1911 def _StartInstanceDisks(cfg, instance, force):
1912   """Start the disks of an instance.
1913
1914   """
1915   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1916                                            ignore_secondaries=force)
1917   if not disks_ok:
1918     _ShutdownInstanceDisks(instance, cfg)
1919     if force is not None and not force:
1920       logger.Error("If the message above refers to a secondary node,"
1921                    " you can retry the operation using '--force'.")
1922     raise errors.OpExecError("Disk consistency error")
1923
1924
1925 class LUDeactivateInstanceDisks(NoHooksLU):
1926   """Shutdown an instance's disks.
1927
1928   """
1929   _OP_REQP = ["instance_name"]
1930
1931   def CheckPrereq(self):
1932     """Check prerequisites.
1933
1934     This checks that the instance is in the cluster.
1935
1936     """
1937     instance = self.cfg.GetInstanceInfo(
1938       self.cfg.ExpandInstanceName(self.op.instance_name))
1939     if instance is None:
1940       raise errors.OpPrereqError("Instance '%s' not known" %
1941                                  self.op.instance_name)
1942     self.instance = instance
1943
1944   def Exec(self, feedback_fn):
1945     """Deactivate the disks
1946
1947     """
1948     instance = self.instance
1949     ins_l = rpc.call_instance_list([instance.primary_node])
1950     ins_l = ins_l[instance.primary_node]
1951     if not type(ins_l) is list:
1952       raise errors.OpExecError("Can't contact node '%s'" %
1953                                instance.primary_node)
1954
1955     if self.instance.name in ins_l:
1956       raise errors.OpExecError("Instance is running, can't shutdown"
1957                                " block devices.")
1958
1959     _ShutdownInstanceDisks(instance, self.cfg)
1960
1961
1962 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1963   """Shutdown block devices of an instance.
1964
1965   This does the shutdown on all nodes of the instance.
1966
1967   If the ignore_primary is false, errors on the primary node are
1968   ignored.
1969
1970   """
1971   result = True
1972   for disk in instance.disks:
1973     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1974       cfg.SetDiskID(top_disk, node)
1975       if not rpc.call_blockdev_shutdown(node, top_disk):
1976         logger.Error("could not shutdown block device %s on node %s" %
1977                      (disk.iv_name, node))
1978         if not ignore_primary or node != instance.primary_node:
1979           result = False
1980   return result
1981
1982
1983 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1984   """Checks if a node has enough free memory.
1985
1986   This function check if a given node has the needed amount of free
1987   memory. In case the node has less memory or we cannot get the
1988   information from the node, this function raise an OpPrereqError
1989   exception.
1990
1991   Args:
1992     - cfg: a ConfigWriter instance
1993     - node: the node name
1994     - reason: string to use in the error message
1995     - requested: the amount of memory in MiB
1996
1997   """
1998   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1999   if not nodeinfo or not isinstance(nodeinfo, dict):
2000     raise errors.OpPrereqError("Could not contact node %s for resource"
2001                              " information" % (node,))
2002
2003   free_mem = nodeinfo[node].get('memory_free')
2004   if not isinstance(free_mem, int):
2005     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2006                              " was '%s'" % (node, free_mem))
2007   if requested > free_mem:
2008     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2009                              " needed %s MiB, available %s MiB" %
2010                              (node, reason, requested, free_mem))
2011
2012
2013 class LUStartupInstance(LogicalUnit):
2014   """Starts an instance.
2015
2016   """
2017   HPATH = "instance-start"
2018   HTYPE = constants.HTYPE_INSTANCE
2019   _OP_REQP = ["instance_name", "force"]
2020
2021   def BuildHooksEnv(self):
2022     """Build hooks env.
2023
2024     This runs on master, primary and secondary nodes of the instance.
2025
2026     """
2027     env = {
2028       "FORCE": self.op.force,
2029       }
2030     env.update(_BuildInstanceHookEnvByObject(self.instance))
2031     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2032           list(self.instance.secondary_nodes))
2033     return env, nl, nl
2034
2035   def CheckPrereq(self):
2036     """Check prerequisites.
2037
2038     This checks that the instance is in the cluster.
2039
2040     """
2041     instance = self.cfg.GetInstanceInfo(
2042       self.cfg.ExpandInstanceName(self.op.instance_name))
2043     if instance is None:
2044       raise errors.OpPrereqError("Instance '%s' not known" %
2045                                  self.op.instance_name)
2046
2047     # check bridges existance
2048     _CheckInstanceBridgesExist(instance)
2049
2050     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2051                          "starting instance %s" % instance.name,
2052                          instance.memory)
2053
2054     self.instance = instance
2055     self.op.instance_name = instance.name
2056
2057   def Exec(self, feedback_fn):
2058     """Start the instance.
2059
2060     """
2061     instance = self.instance
2062     force = self.op.force
2063     extra_args = getattr(self.op, "extra_args", "")
2064
2065     node_current = instance.primary_node
2066
2067     _StartInstanceDisks(self.cfg, instance, force)
2068
2069     if not rpc.call_instance_start(node_current, instance, extra_args):
2070       _ShutdownInstanceDisks(instance, self.cfg)
2071       raise errors.OpExecError("Could not start instance")
2072
2073     self.cfg.MarkInstanceUp(instance.name)
2074
2075
2076 class LURebootInstance(LogicalUnit):
2077   """Reboot an instance.
2078
2079   """
2080   HPATH = "instance-reboot"
2081   HTYPE = constants.HTYPE_INSTANCE
2082   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2083
2084   def BuildHooksEnv(self):
2085     """Build hooks env.
2086
2087     This runs on master, primary and secondary nodes of the instance.
2088
2089     """
2090     env = {
2091       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2092       }
2093     env.update(_BuildInstanceHookEnvByObject(self.instance))
2094     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2095           list(self.instance.secondary_nodes))
2096     return env, nl, nl
2097
2098   def CheckPrereq(self):
2099     """Check prerequisites.
2100
2101     This checks that the instance is in the cluster.
2102
2103     """
2104     instance = self.cfg.GetInstanceInfo(
2105       self.cfg.ExpandInstanceName(self.op.instance_name))
2106     if instance is None:
2107       raise errors.OpPrereqError("Instance '%s' not known" %
2108                                  self.op.instance_name)
2109
2110     # check bridges existance
2111     _CheckInstanceBridgesExist(instance)
2112
2113     self.instance = instance
2114     self.op.instance_name = instance.name
2115
2116   def Exec(self, feedback_fn):
2117     """Reboot the instance.
2118
2119     """
2120     instance = self.instance
2121     ignore_secondaries = self.op.ignore_secondaries
2122     reboot_type = self.op.reboot_type
2123     extra_args = getattr(self.op, "extra_args", "")
2124
2125     node_current = instance.primary_node
2126
2127     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2128                            constants.INSTANCE_REBOOT_HARD,
2129                            constants.INSTANCE_REBOOT_FULL]:
2130       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2131                                   (constants.INSTANCE_REBOOT_SOFT,
2132                                    constants.INSTANCE_REBOOT_HARD,
2133                                    constants.INSTANCE_REBOOT_FULL))
2134
2135     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2136                        constants.INSTANCE_REBOOT_HARD]:
2137       if not rpc.call_instance_reboot(node_current, instance,
2138                                       reboot_type, extra_args):
2139         raise errors.OpExecError("Could not reboot instance")
2140     else:
2141       if not rpc.call_instance_shutdown(node_current, instance):
2142         raise errors.OpExecError("could not shutdown instance for full reboot")
2143       _ShutdownInstanceDisks(instance, self.cfg)
2144       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2145       if not rpc.call_instance_start(node_current, instance, extra_args):
2146         _ShutdownInstanceDisks(instance, self.cfg)
2147         raise errors.OpExecError("Could not start instance for full reboot")
2148
2149     self.cfg.MarkInstanceUp(instance.name)
2150
2151
2152 class LUShutdownInstance(LogicalUnit):
2153   """Shutdown an instance.
2154
2155   """
2156   HPATH = "instance-stop"
2157   HTYPE = constants.HTYPE_INSTANCE
2158   _OP_REQP = ["instance_name"]
2159
2160   def BuildHooksEnv(self):
2161     """Build hooks env.
2162
2163     This runs on master, primary and secondary nodes of the instance.
2164
2165     """
2166     env = _BuildInstanceHookEnvByObject(self.instance)
2167     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2168           list(self.instance.secondary_nodes))
2169     return env, nl, nl
2170
2171   def CheckPrereq(self):
2172     """Check prerequisites.
2173
2174     This checks that the instance is in the cluster.
2175
2176     """
2177     instance = self.cfg.GetInstanceInfo(
2178       self.cfg.ExpandInstanceName(self.op.instance_name))
2179     if instance is None:
2180       raise errors.OpPrereqError("Instance '%s' not known" %
2181                                  self.op.instance_name)
2182     self.instance = instance
2183
2184   def Exec(self, feedback_fn):
2185     """Shutdown the instance.
2186
2187     """
2188     instance = self.instance
2189     node_current = instance.primary_node
2190     if not rpc.call_instance_shutdown(node_current, instance):
2191       logger.Error("could not shutdown instance")
2192
2193     self.cfg.MarkInstanceDown(instance.name)
2194     _ShutdownInstanceDisks(instance, self.cfg)
2195
2196
2197 class LUReinstallInstance(LogicalUnit):
2198   """Reinstall an instance.
2199
2200   """
2201   HPATH = "instance-reinstall"
2202   HTYPE = constants.HTYPE_INSTANCE
2203   _OP_REQP = ["instance_name"]
2204
2205   def BuildHooksEnv(self):
2206     """Build hooks env.
2207
2208     This runs on master, primary and secondary nodes of the instance.
2209
2210     """
2211     env = _BuildInstanceHookEnvByObject(self.instance)
2212     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2213           list(self.instance.secondary_nodes))
2214     return env, nl, nl
2215
2216   def CheckPrereq(self):
2217     """Check prerequisites.
2218
2219     This checks that the instance is in the cluster and is not running.
2220
2221     """
2222     instance = self.cfg.GetInstanceInfo(
2223       self.cfg.ExpandInstanceName(self.op.instance_name))
2224     if instance is None:
2225       raise errors.OpPrereqError("Instance '%s' not known" %
2226                                  self.op.instance_name)
2227     if instance.disk_template == constants.DT_DISKLESS:
2228       raise errors.OpPrereqError("Instance '%s' has no disks" %
2229                                  self.op.instance_name)
2230     if instance.status != "down":
2231       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2232                                  self.op.instance_name)
2233     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2234     if remote_info:
2235       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2236                                  (self.op.instance_name,
2237                                   instance.primary_node))
2238
2239     self.op.os_type = getattr(self.op, "os_type", None)
2240     if self.op.os_type is not None:
2241       # OS verification
2242       pnode = self.cfg.GetNodeInfo(
2243         self.cfg.ExpandNodeName(instance.primary_node))
2244       if pnode is None:
2245         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2246                                    self.op.pnode)
2247       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2248       if not os_obj:
2249         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2250                                    " primary node"  % self.op.os_type)
2251
2252     self.instance = instance
2253
2254   def Exec(self, feedback_fn):
2255     """Reinstall the instance.
2256
2257     """
2258     inst = self.instance
2259
2260     if self.op.os_type is not None:
2261       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2262       inst.os = self.op.os_type
2263       self.cfg.AddInstance(inst)
2264
2265     _StartInstanceDisks(self.cfg, inst, None)
2266     try:
2267       feedback_fn("Running the instance OS create scripts...")
2268       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2269         raise errors.OpExecError("Could not install OS for instance %s"
2270                                  " on node %s" %
2271                                  (inst.name, inst.primary_node))
2272     finally:
2273       _ShutdownInstanceDisks(inst, self.cfg)
2274
2275
2276 class LURenameInstance(LogicalUnit):
2277   """Rename an instance.
2278
2279   """
2280   HPATH = "instance-rename"
2281   HTYPE = constants.HTYPE_INSTANCE
2282   _OP_REQP = ["instance_name", "new_name"]
2283
2284   def BuildHooksEnv(self):
2285     """Build hooks env.
2286
2287     This runs on master, primary and secondary nodes of the instance.
2288
2289     """
2290     env = _BuildInstanceHookEnvByObject(self.instance)
2291     env["INSTANCE_NEW_NAME"] = self.op.new_name
2292     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2293           list(self.instance.secondary_nodes))
2294     return env, nl, nl
2295
2296   def CheckPrereq(self):
2297     """Check prerequisites.
2298
2299     This checks that the instance is in the cluster and is not running.
2300
2301     """
2302     instance = self.cfg.GetInstanceInfo(
2303       self.cfg.ExpandInstanceName(self.op.instance_name))
2304     if instance is None:
2305       raise errors.OpPrereqError("Instance '%s' not known" %
2306                                  self.op.instance_name)
2307     if instance.status != "down":
2308       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2309                                  self.op.instance_name)
2310     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2311     if remote_info:
2312       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2313                                  (self.op.instance_name,
2314                                   instance.primary_node))
2315     self.instance = instance
2316
2317     # new name verification
2318     name_info = utils.HostInfo(self.op.new_name)
2319
2320     self.op.new_name = new_name = name_info.name
2321     instance_list = self.cfg.GetInstanceList()
2322     if new_name in instance_list:
2323       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2324                                  instance_name)
2325
2326     if not getattr(self.op, "ignore_ip", False):
2327       command = ["fping", "-q", name_info.ip]
2328       result = utils.RunCmd(command)
2329       if not result.failed:
2330         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2331                                    (name_info.ip, new_name))
2332
2333
2334   def Exec(self, feedback_fn):
2335     """Reinstall the instance.
2336
2337     """
2338     inst = self.instance
2339     old_name = inst.name
2340
2341     self.cfg.RenameInstance(inst.name, self.op.new_name)
2342
2343     # re-read the instance from the configuration after rename
2344     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2345
2346     _StartInstanceDisks(self.cfg, inst, None)
2347     try:
2348       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2349                                           "sda", "sdb"):
2350         msg = ("Could run OS rename script for instance %s on node %s (but the"
2351                " instance has been renamed in Ganeti)" %
2352                (inst.name, inst.primary_node))
2353         logger.Error(msg)
2354     finally:
2355       _ShutdownInstanceDisks(inst, self.cfg)
2356
2357
2358 class LURemoveInstance(LogicalUnit):
2359   """Remove an instance.
2360
2361   """
2362   HPATH = "instance-remove"
2363   HTYPE = constants.HTYPE_INSTANCE
2364   _OP_REQP = ["instance_name"]
2365
2366   def BuildHooksEnv(self):
2367     """Build hooks env.
2368
2369     This runs on master, primary and secondary nodes of the instance.
2370
2371     """
2372     env = _BuildInstanceHookEnvByObject(self.instance)
2373     nl = [self.sstore.GetMasterNode()]
2374     return env, nl, nl
2375
2376   def CheckPrereq(self):
2377     """Check prerequisites.
2378
2379     This checks that the instance is in the cluster.
2380
2381     """
2382     instance = self.cfg.GetInstanceInfo(
2383       self.cfg.ExpandInstanceName(self.op.instance_name))
2384     if instance is None:
2385       raise errors.OpPrereqError("Instance '%s' not known" %
2386                                  self.op.instance_name)
2387     self.instance = instance
2388
2389   def Exec(self, feedback_fn):
2390     """Remove the instance.
2391
2392     """
2393     instance = self.instance
2394     logger.Info("shutting down instance %s on node %s" %
2395                 (instance.name, instance.primary_node))
2396
2397     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2398       if self.op.ignore_failures:
2399         feedback_fn("Warning: can't shutdown instance")
2400       else:
2401         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2402                                  (instance.name, instance.primary_node))
2403
2404     logger.Info("removing block devices for instance %s" % instance.name)
2405
2406     if not _RemoveDisks(instance, self.cfg):
2407       if self.op.ignore_failures:
2408         feedback_fn("Warning: can't remove instance's disks")
2409       else:
2410         raise errors.OpExecError("Can't remove instance's disks")
2411
2412     logger.Info("removing instance %s out of cluster config" % instance.name)
2413
2414     self.cfg.RemoveInstance(instance.name)
2415
2416
2417 class LUQueryInstances(NoHooksLU):
2418   """Logical unit for querying instances.
2419
2420   """
2421   _OP_REQP = ["output_fields", "names"]
2422
2423   def CheckPrereq(self):
2424     """Check prerequisites.
2425
2426     This checks that the fields required are valid output fields.
2427
2428     """
2429     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2430     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2431                                "admin_state", "admin_ram",
2432                                "disk_template", "ip", "mac", "bridge",
2433                                "sda_size", "sdb_size", "vcpus"],
2434                        dynamic=self.dynamic_fields,
2435                        selected=self.op.output_fields)
2436
2437     self.wanted = _GetWantedInstances(self, self.op.names)
2438
2439   def Exec(self, feedback_fn):
2440     """Computes the list of nodes and their attributes.
2441
2442     """
2443     instance_names = self.wanted
2444     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2445                      in instance_names]
2446
2447     # begin data gathering
2448
2449     nodes = frozenset([inst.primary_node for inst in instance_list])
2450
2451     bad_nodes = []
2452     if self.dynamic_fields.intersection(self.op.output_fields):
2453       live_data = {}
2454       node_data = rpc.call_all_instances_info(nodes)
2455       for name in nodes:
2456         result = node_data[name]
2457         if result:
2458           live_data.update(result)
2459         elif result == False:
2460           bad_nodes.append(name)
2461         # else no instance is alive
2462     else:
2463       live_data = dict([(name, {}) for name in instance_names])
2464
2465     # end data gathering
2466
2467     output = []
2468     for instance in instance_list:
2469       iout = []
2470       for field in self.op.output_fields:
2471         if field == "name":
2472           val = instance.name
2473         elif field == "os":
2474           val = instance.os
2475         elif field == "pnode":
2476           val = instance.primary_node
2477         elif field == "snodes":
2478           val = list(instance.secondary_nodes)
2479         elif field == "admin_state":
2480           val = (instance.status != "down")
2481         elif field == "oper_state":
2482           if instance.primary_node in bad_nodes:
2483             val = None
2484           else:
2485             val = bool(live_data.get(instance.name))
2486         elif field == "status":
2487           if instance.primary_node in bad_nodes:
2488             val = "ERROR_nodedown"
2489           else:
2490             running = bool(live_data.get(instance.name))
2491             if running:
2492               if instance.status != "down":
2493                 val = "running"
2494               else:
2495                 val = "ERROR_up"
2496             else:
2497               if instance.status != "down":
2498                 val = "ERROR_down"
2499               else:
2500                 val = "ADMIN_down"
2501         elif field == "admin_ram":
2502           val = instance.memory
2503         elif field == "oper_ram":
2504           if instance.primary_node in bad_nodes:
2505             val = None
2506           elif instance.name in live_data:
2507             val = live_data[instance.name].get("memory", "?")
2508           else:
2509             val = "-"
2510         elif field == "disk_template":
2511           val = instance.disk_template
2512         elif field == "ip":
2513           val = instance.nics[0].ip
2514         elif field == "bridge":
2515           val = instance.nics[0].bridge
2516         elif field == "mac":
2517           val = instance.nics[0].mac
2518         elif field == "sda_size" or field == "sdb_size":
2519           disk = instance.FindDisk(field[:3])
2520           if disk is None:
2521             val = None
2522           else:
2523             val = disk.size
2524         elif field == "vcpus":
2525           val = instance.vcpus
2526         else:
2527           raise errors.ParameterError(field)
2528         iout.append(val)
2529       output.append(iout)
2530
2531     return output
2532
2533
2534 class LUFailoverInstance(LogicalUnit):
2535   """Failover an instance.
2536
2537   """
2538   HPATH = "instance-failover"
2539   HTYPE = constants.HTYPE_INSTANCE
2540   _OP_REQP = ["instance_name", "ignore_consistency"]
2541
2542   def BuildHooksEnv(self):
2543     """Build hooks env.
2544
2545     This runs on master, primary and secondary nodes of the instance.
2546
2547     """
2548     env = {
2549       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2550       }
2551     env.update(_BuildInstanceHookEnvByObject(self.instance))
2552     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2553     return env, nl, nl
2554
2555   def CheckPrereq(self):
2556     """Check prerequisites.
2557
2558     This checks that the instance is in the cluster.
2559
2560     """
2561     instance = self.cfg.GetInstanceInfo(
2562       self.cfg.ExpandInstanceName(self.op.instance_name))
2563     if instance is None:
2564       raise errors.OpPrereqError("Instance '%s' not known" %
2565                                  self.op.instance_name)
2566
2567     if instance.disk_template not in constants.DTS_NET_MIRROR:
2568       raise errors.OpPrereqError("Instance's disk layout is not"
2569                                  " network mirrored, cannot failover.")
2570
2571     secondary_nodes = instance.secondary_nodes
2572     if not secondary_nodes:
2573       raise errors.ProgrammerError("no secondary node but using "
2574                                    "DT_REMOTE_RAID1 template")
2575
2576     target_node = secondary_nodes[0]
2577     # check memory requirements on the secondary node
2578     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2579                          instance.name, instance.memory)
2580
2581     # check bridge existance
2582     brlist = [nic.bridge for nic in instance.nics]
2583     if not rpc.call_bridges_exist(target_node, brlist):
2584       raise errors.OpPrereqError("One or more target bridges %s does not"
2585                                  " exist on destination node '%s'" %
2586                                  (brlist, target_node))
2587
2588     self.instance = instance
2589
2590   def Exec(self, feedback_fn):
2591     """Failover an instance.
2592
2593     The failover is done by shutting it down on its present node and
2594     starting it on the secondary.
2595
2596     """
2597     instance = self.instance
2598
2599     source_node = instance.primary_node
2600     target_node = instance.secondary_nodes[0]
2601
2602     feedback_fn("* checking disk consistency between source and target")
2603     for dev in instance.disks:
2604       # for remote_raid1, these are md over drbd
2605       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2606         if not self.op.ignore_consistency:
2607           raise errors.OpExecError("Disk %s is degraded on target node,"
2608                                    " aborting failover." % dev.iv_name)
2609
2610     feedback_fn("* shutting down instance on source node")
2611     logger.Info("Shutting down instance %s on node %s" %
2612                 (instance.name, source_node))
2613
2614     if not rpc.call_instance_shutdown(source_node, instance):
2615       if self.op.ignore_consistency:
2616         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2617                      " anyway. Please make sure node %s is down"  %
2618                      (instance.name, source_node, source_node))
2619       else:
2620         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2621                                  (instance.name, source_node))
2622
2623     feedback_fn("* deactivating the instance's disks on source node")
2624     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2625       raise errors.OpExecError("Can't shut down the instance's disks.")
2626
2627     instance.primary_node = target_node
2628     # distribute new instance config to the other nodes
2629     self.cfg.AddInstance(instance)
2630
2631     feedback_fn("* activating the instance's disks on target node")
2632     logger.Info("Starting instance %s on node %s" %
2633                 (instance.name, target_node))
2634
2635     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2636                                              ignore_secondaries=True)
2637     if not disks_ok:
2638       _ShutdownInstanceDisks(instance, self.cfg)
2639       raise errors.OpExecError("Can't activate the instance's disks")
2640
2641     feedback_fn("* starting the instance on the target node")
2642     if not rpc.call_instance_start(target_node, instance, None):
2643       _ShutdownInstanceDisks(instance, self.cfg)
2644       raise errors.OpExecError("Could not start instance %s on node %s." %
2645                                (instance.name, target_node))
2646
2647
2648 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2649   """Create a tree of block devices on the primary node.
2650
2651   This always creates all devices.
2652
2653   """
2654   if device.children:
2655     for child in device.children:
2656       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2657         return False
2658
2659   cfg.SetDiskID(device, node)
2660   new_id = rpc.call_blockdev_create(node, device, device.size,
2661                                     instance.name, True, info)
2662   if not new_id:
2663     return False
2664   if device.physical_id is None:
2665     device.physical_id = new_id
2666   return True
2667
2668
2669 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2670   """Create a tree of block devices on a secondary node.
2671
2672   If this device type has to be created on secondaries, create it and
2673   all its children.
2674
2675   If not, just recurse to children keeping the same 'force' value.
2676
2677   """
2678   if device.CreateOnSecondary():
2679     force = True
2680   if device.children:
2681     for child in device.children:
2682       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2683                                         child, force, info):
2684         return False
2685
2686   if not force:
2687     return True
2688   cfg.SetDiskID(device, node)
2689   new_id = rpc.call_blockdev_create(node, device, device.size,
2690                                     instance.name, False, info)
2691   if not new_id:
2692     return False
2693   if device.physical_id is None:
2694     device.physical_id = new_id
2695   return True
2696
2697
2698 def _GenerateUniqueNames(cfg, exts):
2699   """Generate a suitable LV name.
2700
2701   This will generate a logical volume name for the given instance.
2702
2703   """
2704   results = []
2705   for val in exts:
2706     new_id = cfg.GenerateUniqueID()
2707     results.append("%s%s" % (new_id, val))
2708   return results
2709
2710
2711 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2712   """Generate a drbd device complete with its children.
2713
2714   """
2715   port = cfg.AllocatePort()
2716   vgname = cfg.GetVGName()
2717   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2718                           logical_id=(vgname, names[0]))
2719   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2720                           logical_id=(vgname, names[1]))
2721   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2722                           logical_id = (primary, secondary, port),
2723                           children = [dev_data, dev_meta])
2724   return drbd_dev
2725
2726
2727 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2728   """Generate a drbd8 device complete with its children.
2729
2730   """
2731   port = cfg.AllocatePort()
2732   vgname = cfg.GetVGName()
2733   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2734                           logical_id=(vgname, names[0]))
2735   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2736                           logical_id=(vgname, names[1]))
2737   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2738                           logical_id = (primary, secondary, port),
2739                           children = [dev_data, dev_meta],
2740                           iv_name=iv_name)
2741   return drbd_dev
2742
2743
2744 def _GenerateDiskTemplate(cfg, template_name,
2745                           instance_name, primary_node,
2746                           secondary_nodes, disk_sz, swap_sz):
2747   """Generate the entire disk layout for a given template type.
2748
2749   """
2750   #TODO: compute space requirements
2751
2752   vgname = cfg.GetVGName()
2753   if template_name == constants.DT_DISKLESS:
2754     disks = []
2755   elif template_name == constants.DT_PLAIN:
2756     if len(secondary_nodes) != 0:
2757       raise errors.ProgrammerError("Wrong template configuration")
2758
2759     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2760     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2761                            logical_id=(vgname, names[0]),
2762                            iv_name = "sda")
2763     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2764                            logical_id=(vgname, names[1]),
2765                            iv_name = "sdb")
2766     disks = [sda_dev, sdb_dev]
2767   elif template_name == constants.DT_LOCAL_RAID1:
2768     if len(secondary_nodes) != 0:
2769       raise errors.ProgrammerError("Wrong template configuration")
2770
2771
2772     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2773                                        ".sdb_m1", ".sdb_m2"])
2774     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2775                               logical_id=(vgname, names[0]))
2776     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2777                               logical_id=(vgname, names[1]))
2778     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2779                               size=disk_sz,
2780                               children = [sda_dev_m1, sda_dev_m2])
2781     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2782                               logical_id=(vgname, names[2]))
2783     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2784                               logical_id=(vgname, names[3]))
2785     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2786                               size=swap_sz,
2787                               children = [sdb_dev_m1, sdb_dev_m2])
2788     disks = [md_sda_dev, md_sdb_dev]
2789   elif template_name == constants.DT_REMOTE_RAID1:
2790     if len(secondary_nodes) != 1:
2791       raise errors.ProgrammerError("Wrong template configuration")
2792     remote_node = secondary_nodes[0]
2793     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2794                                        ".sdb_data", ".sdb_meta"])
2795     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2796                                          disk_sz, names[0:2])
2797     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2798                               children = [drbd_sda_dev], size=disk_sz)
2799     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2800                                          swap_sz, names[2:4])
2801     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2802                               children = [drbd_sdb_dev], size=swap_sz)
2803     disks = [md_sda_dev, md_sdb_dev]
2804   elif template_name == constants.DT_DRBD8:
2805     if len(secondary_nodes) != 1:
2806       raise errors.ProgrammerError("Wrong template configuration")
2807     remote_node = secondary_nodes[0]
2808     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2809                                        ".sdb_data", ".sdb_meta"])
2810     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2811                                          disk_sz, names[0:2], "sda")
2812     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2813                                          swap_sz, names[2:4], "sdb")
2814     disks = [drbd_sda_dev, drbd_sdb_dev]
2815   else:
2816     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2817   return disks
2818
2819
2820 def _GetInstanceInfoText(instance):
2821   """Compute that text that should be added to the disk's metadata.
2822
2823   """
2824   return "originstname+%s" % instance.name
2825
2826
2827 def _CreateDisks(cfg, instance):
2828   """Create all disks for an instance.
2829
2830   This abstracts away some work from AddInstance.
2831
2832   Args:
2833     instance: the instance object
2834
2835   Returns:
2836     True or False showing the success of the creation process
2837
2838   """
2839   info = _GetInstanceInfoText(instance)
2840
2841   for device in instance.disks:
2842     logger.Info("creating volume %s for instance %s" %
2843               (device.iv_name, instance.name))
2844     #HARDCODE
2845     for secondary_node in instance.secondary_nodes:
2846       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2847                                         device, False, info):
2848         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2849                      (device.iv_name, device, secondary_node))
2850         return False
2851     #HARDCODE
2852     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2853                                     instance, device, info):
2854       logger.Error("failed to create volume %s on primary!" %
2855                    device.iv_name)
2856       return False
2857   return True
2858
2859
2860 def _RemoveDisks(instance, cfg):
2861   """Remove all disks for an instance.
2862
2863   This abstracts away some work from `AddInstance()` and
2864   `RemoveInstance()`. Note that in case some of the devices couldn't
2865   be removed, the removal will continue with the other ones (compare
2866   with `_CreateDisks()`).
2867
2868   Args:
2869     instance: the instance object
2870
2871   Returns:
2872     True or False showing the success of the removal proces
2873
2874   """
2875   logger.Info("removing block devices for instance %s" % instance.name)
2876
2877   result = True
2878   for device in instance.disks:
2879     for node, disk in device.ComputeNodeTree(instance.primary_node):
2880       cfg.SetDiskID(disk, node)
2881       if not rpc.call_blockdev_remove(node, disk):
2882         logger.Error("could not remove block device %s on node %s,"
2883                      " continuing anyway" %
2884                      (device.iv_name, node))
2885         result = False
2886   return result
2887
2888
2889 class LUCreateInstance(LogicalUnit):
2890   """Create an instance.
2891
2892   """
2893   HPATH = "instance-add"
2894   HTYPE = constants.HTYPE_INSTANCE
2895   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2896               "disk_template", "swap_size", "mode", "start", "vcpus",
2897               "wait_for_sync", "ip_check", "mac"]
2898
2899   def BuildHooksEnv(self):
2900     """Build hooks env.
2901
2902     This runs on master, primary and secondary nodes of the instance.
2903
2904     """
2905     env = {
2906       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2907       "INSTANCE_DISK_SIZE": self.op.disk_size,
2908       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2909       "INSTANCE_ADD_MODE": self.op.mode,
2910       }
2911     if self.op.mode == constants.INSTANCE_IMPORT:
2912       env["INSTANCE_SRC_NODE"] = self.op.src_node
2913       env["INSTANCE_SRC_PATH"] = self.op.src_path
2914       env["INSTANCE_SRC_IMAGE"] = self.src_image
2915
2916     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2917       primary_node=self.op.pnode,
2918       secondary_nodes=self.secondaries,
2919       status=self.instance_status,
2920       os_type=self.op.os_type,
2921       memory=self.op.mem_size,
2922       vcpus=self.op.vcpus,
2923       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2924     ))
2925
2926     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2927           self.secondaries)
2928     return env, nl, nl
2929
2930
2931   def CheckPrereq(self):
2932     """Check prerequisites.
2933
2934     """
2935     for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2936       if not hasattr(self.op, attr):
2937         setattr(self.op, attr, None)
2938
2939     if self.op.mode not in (constants.INSTANCE_CREATE,
2940                             constants.INSTANCE_IMPORT):
2941       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2942                                  self.op.mode)
2943
2944     if self.op.mode == constants.INSTANCE_IMPORT:
2945       src_node = getattr(self.op, "src_node", None)
2946       src_path = getattr(self.op, "src_path", None)
2947       if src_node is None or src_path is None:
2948         raise errors.OpPrereqError("Importing an instance requires source"
2949                                    " node and path options")
2950       src_node_full = self.cfg.ExpandNodeName(src_node)
2951       if src_node_full is None:
2952         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2953       self.op.src_node = src_node = src_node_full
2954
2955       if not os.path.isabs(src_path):
2956         raise errors.OpPrereqError("The source path must be absolute")
2957
2958       export_info = rpc.call_export_info(src_node, src_path)
2959
2960       if not export_info:
2961         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2962
2963       if not export_info.has_section(constants.INISECT_EXP):
2964         raise errors.ProgrammerError("Corrupted export config")
2965
2966       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2967       if (int(ei_version) != constants.EXPORT_VERSION):
2968         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2969                                    (ei_version, constants.EXPORT_VERSION))
2970
2971       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2972         raise errors.OpPrereqError("Can't import instance with more than"
2973                                    " one data disk")
2974
2975       # FIXME: are the old os-es, disk sizes, etc. useful?
2976       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2977       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2978                                                          'disk0_dump'))
2979       self.src_image = diskimage
2980     else: # INSTANCE_CREATE
2981       if getattr(self.op, "os_type", None) is None:
2982         raise errors.OpPrereqError("No guest OS specified")
2983
2984     # check primary node
2985     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2986     if pnode is None:
2987       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2988                                  self.op.pnode)
2989     self.op.pnode = pnode.name
2990     self.pnode = pnode
2991     self.secondaries = []
2992     # disk template and mirror node verification
2993     if self.op.disk_template not in constants.DISK_TEMPLATES:
2994       raise errors.OpPrereqError("Invalid disk template name")
2995
2996     if self.op.disk_template in constants.DTS_NET_MIRROR:
2997       if getattr(self.op, "snode", None) is None:
2998         raise errors.OpPrereqError("The networked disk templates need"
2999                                    " a mirror node")
3000
3001       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3002       if snode_name is None:
3003         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3004                                    self.op.snode)
3005       elif snode_name == pnode.name:
3006         raise errors.OpPrereqError("The secondary node cannot be"
3007                                    " the primary node.")
3008       self.secondaries.append(snode_name)
3009
3010     # Required free disk space as a function of disk and swap space
3011     req_size_dict = {
3012       constants.DT_DISKLESS: None,
3013       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3014       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
3015       # 256 MB are added for drbd metadata, 128MB for each drbd device
3016       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
3017       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3018     }
3019
3020     if self.op.disk_template not in req_size_dict:
3021       raise errors.ProgrammerError("Disk template '%s' size requirement"
3022                                    " is unknown" %  self.op.disk_template)
3023
3024     req_size = req_size_dict[self.op.disk_template]
3025
3026     # Check lv size requirements
3027     if req_size is not None:
3028       nodenames = [pnode.name] + self.secondaries
3029       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3030       for node in nodenames:
3031         info = nodeinfo.get(node, None)
3032         if not info:
3033           raise errors.OpPrereqError("Cannot get current information"
3034                                      " from node '%s'" % nodeinfo)
3035         vg_free = info.get('vg_free', None)
3036         if not isinstance(vg_free, int):
3037           raise errors.OpPrereqError("Can't compute free disk space on"
3038                                      " node %s" % node)
3039         if req_size > info['vg_free']:
3040           raise errors.OpPrereqError("Not enough disk space on target node %s."
3041                                      " %d MB available, %d MB required" %
3042                                      (node, info['vg_free'], req_size))
3043
3044     # os verification
3045     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3046     if not os_obj:
3047       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3048                                  " primary node"  % self.op.os_type)
3049
3050     if self.op.kernel_path == constants.VALUE_NONE:
3051       raise errors.OpPrereqError("Can't set instance kernel to none")
3052
3053     # instance verification
3054     hostname1 = utils.HostInfo(self.op.instance_name)
3055
3056     self.op.instance_name = instance_name = hostname1.name
3057     instance_list = self.cfg.GetInstanceList()
3058     if instance_name in instance_list:
3059       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3060                                  instance_name)
3061
3062     ip = getattr(self.op, "ip", None)
3063     if ip is None or ip.lower() == "none":
3064       inst_ip = None
3065     elif ip.lower() == "auto":
3066       inst_ip = hostname1.ip
3067     else:
3068       if not utils.IsValidIP(ip):
3069         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3070                                    " like a valid IP" % ip)
3071       inst_ip = ip
3072     self.inst_ip = inst_ip
3073
3074     if self.op.start and not self.op.ip_check:
3075       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3076                                  " adding an instance in start mode")
3077
3078     if self.op.ip_check:
3079       if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3080                        constants.DEFAULT_NODED_PORT):
3081         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3082                                    (hostname1.ip, instance_name))
3083
3084     # MAC address verification
3085     if self.op.mac != "auto":
3086       if not utils.IsValidMac(self.op.mac.lower()):
3087         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3088                                    self.op.mac)
3089
3090     # bridge verification
3091     bridge = getattr(self.op, "bridge", None)
3092     if bridge is None:
3093       self.op.bridge = self.cfg.GetDefBridge()
3094     else:
3095       self.op.bridge = bridge
3096
3097     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3098       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3099                                  " destination node '%s'" %
3100                                  (self.op.bridge, pnode.name))
3101
3102     # boot order verification
3103     if self.op.hvm_boot_order is not None:
3104       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3105         raise errors.OpPrereqError("invalid boot order specified,"
3106                                    " must be one or more of [acdn]")
3107
3108     if self.op.start:
3109       self.instance_status = 'up'
3110     else:
3111       self.instance_status = 'down'
3112
3113   def Exec(self, feedback_fn):
3114     """Create and add the instance to the cluster.
3115
3116     """
3117     instance = self.op.instance_name
3118     pnode_name = self.pnode.name
3119
3120     if self.op.mac == "auto":
3121       mac_address = self.cfg.GenerateMAC()
3122     else:
3123       mac_address = self.op.mac
3124
3125     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3126     if self.inst_ip is not None:
3127       nic.ip = self.inst_ip
3128
3129     ht_kind = self.sstore.GetHypervisorType()
3130     if ht_kind in constants.HTS_REQ_PORT:
3131       network_port = self.cfg.AllocatePort()
3132     else:
3133       network_port = None
3134
3135     disks = _GenerateDiskTemplate(self.cfg,
3136                                   self.op.disk_template,
3137                                   instance, pnode_name,
3138                                   self.secondaries, self.op.disk_size,
3139                                   self.op.swap_size)
3140
3141     iobj = objects.Instance(name=instance, os=self.op.os_type,
3142                             primary_node=pnode_name,
3143                             memory=self.op.mem_size,
3144                             vcpus=self.op.vcpus,
3145                             nics=[nic], disks=disks,
3146                             disk_template=self.op.disk_template,
3147                             status=self.instance_status,
3148                             network_port=network_port,
3149                             kernel_path=self.op.kernel_path,
3150                             initrd_path=self.op.initrd_path,
3151                             hvm_boot_order=self.op.hvm_boot_order,
3152                             )
3153
3154     feedback_fn("* creating instance disks...")
3155     if not _CreateDisks(self.cfg, iobj):
3156       _RemoveDisks(iobj, self.cfg)
3157       raise errors.OpExecError("Device creation failed, reverting...")
3158
3159     feedback_fn("adding instance %s to cluster config" % instance)
3160
3161     self.cfg.AddInstance(iobj)
3162
3163     if self.op.wait_for_sync:
3164       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3165     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3166       # make sure the disks are not degraded (still sync-ing is ok)
3167       time.sleep(15)
3168       feedback_fn("* checking mirrors status")
3169       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3170     else:
3171       disk_abort = False
3172
3173     if disk_abort:
3174       _RemoveDisks(iobj, self.cfg)
3175       self.cfg.RemoveInstance(iobj.name)
3176       raise errors.OpExecError("There are some degraded disks for"
3177                                " this instance")
3178
3179     feedback_fn("creating os for instance %s on node %s" %
3180                 (instance, pnode_name))
3181
3182     if iobj.disk_template != constants.DT_DISKLESS:
3183       if self.op.mode == constants.INSTANCE_CREATE:
3184         feedback_fn("* running the instance OS create scripts...")
3185         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3186           raise errors.OpExecError("could not add os for instance %s"
3187                                    " on node %s" %
3188                                    (instance, pnode_name))
3189
3190       elif self.op.mode == constants.INSTANCE_IMPORT:
3191         feedback_fn("* running the instance OS import scripts...")
3192         src_node = self.op.src_node
3193         src_image = self.src_image
3194         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3195                                                 src_node, src_image):
3196           raise errors.OpExecError("Could not import os for instance"
3197                                    " %s on node %s" %
3198                                    (instance, pnode_name))
3199       else:
3200         # also checked in the prereq part
3201         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3202                                      % self.op.mode)
3203
3204     if self.op.start:
3205       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3206       feedback_fn("* starting instance...")
3207       if not rpc.call_instance_start(pnode_name, iobj, None):
3208         raise errors.OpExecError("Could not start instance")
3209
3210
3211 class LUConnectConsole(NoHooksLU):
3212   """Connect to an instance's console.
3213
3214   This is somewhat special in that it returns the command line that
3215   you need to run on the master node in order to connect to the
3216   console.
3217
3218   """
3219   _OP_REQP = ["instance_name"]
3220
3221   def CheckPrereq(self):
3222     """Check prerequisites.
3223
3224     This checks that the instance is in the cluster.
3225
3226     """
3227     instance = self.cfg.GetInstanceInfo(
3228       self.cfg.ExpandInstanceName(self.op.instance_name))
3229     if instance is None:
3230       raise errors.OpPrereqError("Instance '%s' not known" %
3231                                  self.op.instance_name)
3232     self.instance = instance
3233
3234   def Exec(self, feedback_fn):
3235     """Connect to the console of an instance
3236
3237     """
3238     instance = self.instance
3239     node = instance.primary_node
3240
3241     node_insts = rpc.call_instance_list([node])[node]
3242     if node_insts is False:
3243       raise errors.OpExecError("Can't connect to node %s." % node)
3244
3245     if instance.name not in node_insts:
3246       raise errors.OpExecError("Instance %s is not running." % instance.name)
3247
3248     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3249
3250     hyper = hypervisor.GetHypervisor()
3251     console_cmd = hyper.GetShellCommandForConsole(instance)
3252     # build ssh cmdline
3253     argv = ["ssh", "-q", "-t"]
3254     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3255     argv.extend(ssh.BATCH_MODE_OPTS)
3256     argv.append(node)
3257     argv.append(console_cmd)
3258     return "ssh", argv
3259
3260
3261 class LUAddMDDRBDComponent(LogicalUnit):
3262   """Adda new mirror member to an instance's disk.
3263
3264   """
3265   HPATH = "mirror-add"
3266   HTYPE = constants.HTYPE_INSTANCE
3267   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3268
3269   def BuildHooksEnv(self):
3270     """Build hooks env.
3271
3272     This runs on the master, the primary and all the secondaries.
3273
3274     """
3275     env = {
3276       "NEW_SECONDARY": self.op.remote_node,
3277       "DISK_NAME": self.op.disk_name,
3278       }
3279     env.update(_BuildInstanceHookEnvByObject(self.instance))
3280     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3281           self.op.remote_node,] + list(self.instance.secondary_nodes)
3282     return env, nl, nl
3283
3284   def CheckPrereq(self):
3285     """Check prerequisites.
3286
3287     This checks that the instance is in the cluster.
3288
3289     """
3290     instance = self.cfg.GetInstanceInfo(
3291       self.cfg.ExpandInstanceName(self.op.instance_name))
3292     if instance is None:
3293       raise errors.OpPrereqError("Instance '%s' not known" %
3294                                  self.op.instance_name)
3295     self.instance = instance
3296
3297     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3298     if remote_node is None:
3299       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3300     self.remote_node = remote_node
3301
3302     if remote_node == instance.primary_node:
3303       raise errors.OpPrereqError("The specified node is the primary node of"
3304                                  " the instance.")
3305
3306     if instance.disk_template != constants.DT_REMOTE_RAID1:
3307       raise errors.OpPrereqError("Instance's disk layout is not"
3308                                  " remote_raid1.")
3309     for disk in instance.disks:
3310       if disk.iv_name == self.op.disk_name:
3311         break
3312     else:
3313       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3314                                  " instance." % self.op.disk_name)
3315     if len(disk.children) > 1:
3316       raise errors.OpPrereqError("The device already has two slave devices."
3317                                  " This would create a 3-disk raid1 which we"
3318                                  " don't allow.")
3319     self.disk = disk
3320
3321   def Exec(self, feedback_fn):
3322     """Add the mirror component
3323
3324     """
3325     disk = self.disk
3326     instance = self.instance
3327
3328     remote_node = self.remote_node
3329     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3330     names = _GenerateUniqueNames(self.cfg, lv_names)
3331     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3332                                      remote_node, disk.size, names)
3333
3334     logger.Info("adding new mirror component on secondary")
3335     #HARDCODE
3336     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3337                                       new_drbd, False,
3338                                       _GetInstanceInfoText(instance)):
3339       raise errors.OpExecError("Failed to create new component on secondary"
3340                                " node %s" % remote_node)
3341
3342     logger.Info("adding new mirror component on primary")
3343     #HARDCODE
3344     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3345                                     instance, new_drbd,
3346                                     _GetInstanceInfoText(instance)):
3347       # remove secondary dev
3348       self.cfg.SetDiskID(new_drbd, remote_node)
3349       rpc.call_blockdev_remove(remote_node, new_drbd)
3350       raise errors.OpExecError("Failed to create volume on primary")
3351
3352     # the device exists now
3353     # call the primary node to add the mirror to md
3354     logger.Info("adding new mirror component to md")
3355     if not rpc.call_blockdev_addchildren(instance.primary_node,
3356                                          disk, [new_drbd]):
3357       logger.Error("Can't add mirror compoment to md!")
3358       self.cfg.SetDiskID(new_drbd, remote_node)
3359       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3360         logger.Error("Can't rollback on secondary")
3361       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3362       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3363         logger.Error("Can't rollback on primary")
3364       raise errors.OpExecError("Can't add mirror component to md array")
3365
3366     disk.children.append(new_drbd)
3367
3368     self.cfg.AddInstance(instance)
3369
3370     _WaitForSync(self.cfg, instance, self.proc)
3371
3372     return 0
3373
3374
3375 class LURemoveMDDRBDComponent(LogicalUnit):
3376   """Remove a component from a remote_raid1 disk.
3377
3378   """
3379   HPATH = "mirror-remove"
3380   HTYPE = constants.HTYPE_INSTANCE
3381   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3382
3383   def BuildHooksEnv(self):
3384     """Build hooks env.
3385
3386     This runs on the master, the primary and all the secondaries.
3387
3388     """
3389     env = {
3390       "DISK_NAME": self.op.disk_name,
3391       "DISK_ID": self.op.disk_id,
3392       "OLD_SECONDARY": self.old_secondary,
3393       }
3394     env.update(_BuildInstanceHookEnvByObject(self.instance))
3395     nl = [self.sstore.GetMasterNode(),
3396           self.instance.primary_node] + list(self.instance.secondary_nodes)
3397     return env, nl, nl
3398
3399   def CheckPrereq(self):
3400     """Check prerequisites.
3401
3402     This checks that the instance is in the cluster.
3403
3404     """
3405     instance = self.cfg.GetInstanceInfo(
3406       self.cfg.ExpandInstanceName(self.op.instance_name))
3407     if instance is None:
3408       raise errors.OpPrereqError("Instance '%s' not known" %
3409                                  self.op.instance_name)
3410     self.instance = instance
3411
3412     if instance.disk_template != constants.DT_REMOTE_RAID1:
3413       raise errors.OpPrereqError("Instance's disk layout is not"
3414                                  " remote_raid1.")
3415     for disk in instance.disks:
3416       if disk.iv_name == self.op.disk_name:
3417         break
3418     else:
3419       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3420                                  " instance." % self.op.disk_name)
3421     for child in disk.children:
3422       if (child.dev_type == constants.LD_DRBD7 and
3423           child.logical_id[2] == self.op.disk_id):
3424         break
3425     else:
3426       raise errors.OpPrereqError("Can't find the device with this port.")
3427
3428     if len(disk.children) < 2:
3429       raise errors.OpPrereqError("Cannot remove the last component from"
3430                                  " a mirror.")
3431     self.disk = disk
3432     self.child = child
3433     if self.child.logical_id[0] == instance.primary_node:
3434       oid = 1
3435     else:
3436       oid = 0
3437     self.old_secondary = self.child.logical_id[oid]
3438
3439   def Exec(self, feedback_fn):
3440     """Remove the mirror component
3441
3442     """
3443     instance = self.instance
3444     disk = self.disk
3445     child = self.child
3446     logger.Info("remove mirror component")
3447     self.cfg.SetDiskID(disk, instance.primary_node)
3448     if not rpc.call_blockdev_removechildren(instance.primary_node,
3449                                             disk, [child]):
3450       raise errors.OpExecError("Can't remove child from mirror.")
3451
3452     for node in child.logical_id[:2]:
3453       self.cfg.SetDiskID(child, node)
3454       if not rpc.call_blockdev_remove(node, child):
3455         logger.Error("Warning: failed to remove device from node %s,"
3456                      " continuing operation." % node)
3457
3458     disk.children.remove(child)
3459     self.cfg.AddInstance(instance)
3460
3461
3462 class LUReplaceDisks(LogicalUnit):
3463   """Replace the disks of an instance.
3464
3465   """
3466   HPATH = "mirrors-replace"
3467   HTYPE = constants.HTYPE_INSTANCE
3468   _OP_REQP = ["instance_name", "mode", "disks"]
3469
3470   def BuildHooksEnv(self):
3471     """Build hooks env.
3472
3473     This runs on the master, the primary and all the secondaries.
3474
3475     """
3476     env = {
3477       "MODE": self.op.mode,
3478       "NEW_SECONDARY": self.op.remote_node,
3479       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3480       }
3481     env.update(_BuildInstanceHookEnvByObject(self.instance))
3482     nl = [
3483       self.sstore.GetMasterNode(),
3484       self.instance.primary_node,
3485       ]
3486     if self.op.remote_node is not None:
3487       nl.append(self.op.remote_node)
3488     return env, nl, nl
3489
3490   def CheckPrereq(self):
3491     """Check prerequisites.
3492
3493     This checks that the instance is in the cluster.
3494
3495     """
3496     instance = self.cfg.GetInstanceInfo(
3497       self.cfg.ExpandInstanceName(self.op.instance_name))
3498     if instance is None:
3499       raise errors.OpPrereqError("Instance '%s' not known" %
3500                                  self.op.instance_name)
3501     self.instance = instance
3502     self.op.instance_name = instance.name
3503
3504     if instance.disk_template not in constants.DTS_NET_MIRROR:
3505       raise errors.OpPrereqError("Instance's disk layout is not"
3506                                  " network mirrored.")
3507
3508     if len(instance.secondary_nodes) != 1:
3509       raise errors.OpPrereqError("The instance has a strange layout,"
3510                                  " expected one secondary but found %d" %
3511                                  len(instance.secondary_nodes))
3512
3513     self.sec_node = instance.secondary_nodes[0]
3514
3515     remote_node = getattr(self.op, "remote_node", None)
3516     if remote_node is not None:
3517       remote_node = self.cfg.ExpandNodeName(remote_node)
3518       if remote_node is None:
3519         raise errors.OpPrereqError("Node '%s' not known" %
3520                                    self.op.remote_node)
3521       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3522     else:
3523       self.remote_node_info = None
3524     if remote_node == instance.primary_node:
3525       raise errors.OpPrereqError("The specified node is the primary node of"
3526                                  " the instance.")
3527     elif remote_node == self.sec_node:
3528       if self.op.mode == constants.REPLACE_DISK_SEC:
3529         # this is for DRBD8, where we can't execute the same mode of
3530         # replacement as for drbd7 (no different port allocated)
3531         raise errors.OpPrereqError("Same secondary given, cannot execute"
3532                                    " replacement")
3533       # the user gave the current secondary, switch to
3534       # 'no-replace-secondary' mode for drbd7
3535       remote_node = None
3536     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3537         self.op.mode != constants.REPLACE_DISK_ALL):
3538       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3539                                  " disks replacement, not individual ones")
3540     if instance.disk_template == constants.DT_DRBD8:
3541       if (self.op.mode == constants.REPLACE_DISK_ALL and
3542           remote_node is not None):
3543         # switch to replace secondary mode
3544         self.op.mode = constants.REPLACE_DISK_SEC
3545
3546       if self.op.mode == constants.REPLACE_DISK_ALL:
3547         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3548                                    " secondary disk replacement, not"
3549                                    " both at once")
3550       elif self.op.mode == constants.REPLACE_DISK_PRI:
3551         if remote_node is not None:
3552           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3553                                      " the secondary while doing a primary"
3554                                      " node disk replacement")
3555         self.tgt_node = instance.primary_node
3556         self.oth_node = instance.secondary_nodes[0]
3557       elif self.op.mode == constants.REPLACE_DISK_SEC:
3558         self.new_node = remote_node # this can be None, in which case
3559                                     # we don't change the secondary
3560         self.tgt_node = instance.secondary_nodes[0]
3561         self.oth_node = instance.primary_node
3562       else:
3563         raise errors.ProgrammerError("Unhandled disk replace mode")
3564
3565     for name in self.op.disks:
3566       if instance.FindDisk(name) is None:
3567         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3568                                    (name, instance.name))
3569     self.op.remote_node = remote_node
3570
3571   def _ExecRR1(self, feedback_fn):
3572     """Replace the disks of an instance.
3573
3574     """
3575     instance = self.instance
3576     iv_names = {}
3577     # start of work
3578     if self.op.remote_node is None:
3579       remote_node = self.sec_node
3580     else:
3581       remote_node = self.op.remote_node
3582     cfg = self.cfg
3583     for dev in instance.disks:
3584       size = dev.size
3585       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3586       names = _GenerateUniqueNames(cfg, lv_names)
3587       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3588                                        remote_node, size, names)
3589       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3590       logger.Info("adding new mirror component on secondary for %s" %
3591                   dev.iv_name)
3592       #HARDCODE
3593       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3594                                         new_drbd, False,
3595                                         _GetInstanceInfoText(instance)):
3596         raise errors.OpExecError("Failed to create new component on secondary"
3597                                  " node %s. Full abort, cleanup manually!" %
3598                                  remote_node)
3599
3600       logger.Info("adding new mirror component on primary")
3601       #HARDCODE
3602       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3603                                       instance, new_drbd,
3604                                       _GetInstanceInfoText(instance)):
3605         # remove secondary dev
3606         cfg.SetDiskID(new_drbd, remote_node)
3607         rpc.call_blockdev_remove(remote_node, new_drbd)
3608         raise errors.OpExecError("Failed to create volume on primary!"
3609                                  " Full abort, cleanup manually!!")
3610
3611       # the device exists now
3612       # call the primary node to add the mirror to md
3613       logger.Info("adding new mirror component to md")
3614       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3615                                            [new_drbd]):
3616         logger.Error("Can't add mirror compoment to md!")
3617         cfg.SetDiskID(new_drbd, remote_node)
3618         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3619           logger.Error("Can't rollback on secondary")
3620         cfg.SetDiskID(new_drbd, instance.primary_node)
3621         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3622           logger.Error("Can't rollback on primary")
3623         raise errors.OpExecError("Full abort, cleanup manually!!")
3624
3625       dev.children.append(new_drbd)
3626       cfg.AddInstance(instance)
3627
3628     # this can fail as the old devices are degraded and _WaitForSync
3629     # does a combined result over all disks, so we don't check its
3630     # return value
3631     _WaitForSync(cfg, instance, self.proc, unlock=True)
3632
3633     # so check manually all the devices
3634     for name in iv_names:
3635       dev, child, new_drbd = iv_names[name]
3636       cfg.SetDiskID(dev, instance.primary_node)
3637       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3638       if is_degr:
3639         raise errors.OpExecError("MD device %s is degraded!" % name)
3640       cfg.SetDiskID(new_drbd, instance.primary_node)
3641       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3642       if is_degr:
3643         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3644
3645     for name in iv_names:
3646       dev, child, new_drbd = iv_names[name]
3647       logger.Info("remove mirror %s component" % name)
3648       cfg.SetDiskID(dev, instance.primary_node)
3649       if not rpc.call_blockdev_removechildren(instance.primary_node,
3650                                               dev, [child]):
3651         logger.Error("Can't remove child from mirror, aborting"
3652                      " *this device cleanup*.\nYou need to cleanup manually!!")
3653         continue
3654
3655       for node in child.logical_id[:2]:
3656         logger.Info("remove child device on %s" % node)
3657         cfg.SetDiskID(child, node)
3658         if not rpc.call_blockdev_remove(node, child):
3659           logger.Error("Warning: failed to remove device from node %s,"
3660                        " continuing operation." % node)
3661
3662       dev.children.remove(child)
3663
3664       cfg.AddInstance(instance)
3665
3666   def _ExecD8DiskOnly(self, feedback_fn):
3667     """Replace a disk on the primary or secondary for dbrd8.
3668
3669     The algorithm for replace is quite complicated:
3670       - for each disk to be replaced:
3671         - create new LVs on the target node with unique names
3672         - detach old LVs from the drbd device
3673         - rename old LVs to name_replaced.<time_t>
3674         - rename new LVs to old LVs
3675         - attach the new LVs (with the old names now) to the drbd device
3676       - wait for sync across all devices
3677       - for each modified disk:
3678         - remove old LVs (which have the name name_replaces.<time_t>)
3679
3680     Failures are not very well handled.
3681
3682     """
3683     steps_total = 6
3684     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3685     instance = self.instance
3686     iv_names = {}
3687     vgname = self.cfg.GetVGName()
3688     # start of work
3689     cfg = self.cfg
3690     tgt_node = self.tgt_node
3691     oth_node = self.oth_node
3692
3693     # Step: check device activation
3694     self.proc.LogStep(1, steps_total, "check device existence")
3695     info("checking volume groups")
3696     my_vg = cfg.GetVGName()
3697     results = rpc.call_vg_list([oth_node, tgt_node])
3698     if not results:
3699       raise errors.OpExecError("Can't list volume groups on the nodes")
3700     for node in oth_node, tgt_node:
3701       res = results.get(node, False)
3702       if not res or my_vg not in res:
3703         raise errors.OpExecError("Volume group '%s' not found on %s" %
3704                                  (my_vg, node))
3705     for dev in instance.disks:
3706       if not dev.iv_name in self.op.disks:
3707         continue
3708       for node in tgt_node, oth_node:
3709         info("checking %s on %s" % (dev.iv_name, node))
3710         cfg.SetDiskID(dev, node)
3711         if not rpc.call_blockdev_find(node, dev):
3712           raise errors.OpExecError("Can't find device %s on node %s" %
3713                                    (dev.iv_name, node))
3714
3715     # Step: check other node consistency
3716     self.proc.LogStep(2, steps_total, "check peer consistency")
3717     for dev in instance.disks:
3718       if not dev.iv_name in self.op.disks:
3719         continue
3720       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3721       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3722                                    oth_node==instance.primary_node):
3723         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3724                                  " to replace disks on this node (%s)" %
3725                                  (oth_node, tgt_node))
3726
3727     # Step: create new storage
3728     self.proc.LogStep(3, steps_total, "allocate new storage")
3729     for dev in instance.disks:
3730       if not dev.iv_name in self.op.disks:
3731         continue
3732       size = dev.size
3733       cfg.SetDiskID(dev, tgt_node)
3734       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3735       names = _GenerateUniqueNames(cfg, lv_names)
3736       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3737                              logical_id=(vgname, names[0]))
3738       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3739                              logical_id=(vgname, names[1]))
3740       new_lvs = [lv_data, lv_meta]
3741       old_lvs = dev.children
3742       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3743       info("creating new local storage on %s for %s" %
3744            (tgt_node, dev.iv_name))
3745       # since we *always* want to create this LV, we use the
3746       # _Create...OnPrimary (which forces the creation), even if we
3747       # are talking about the secondary node
3748       for new_lv in new_lvs:
3749         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3750                                         _GetInstanceInfoText(instance)):
3751           raise errors.OpExecError("Failed to create new LV named '%s' on"
3752                                    " node '%s'" %
3753                                    (new_lv.logical_id[1], tgt_node))
3754
3755     # Step: for each lv, detach+rename*2+attach
3756     self.proc.LogStep(4, steps_total, "change drbd configuration")
3757     for dev, old_lvs, new_lvs in iv_names.itervalues():
3758       info("detaching %s drbd from local storage" % dev.iv_name)
3759       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3760         raise errors.OpExecError("Can't detach drbd from local storage on node"
3761                                  " %s for device %s" % (tgt_node, dev.iv_name))
3762       #dev.children = []
3763       #cfg.Update(instance)
3764
3765       # ok, we created the new LVs, so now we know we have the needed
3766       # storage; as such, we proceed on the target node to rename
3767       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3768       # using the assumption that logical_id == physical_id (which in
3769       # turn is the unique_id on that node)
3770
3771       # FIXME(iustin): use a better name for the replaced LVs
3772       temp_suffix = int(time.time())
3773       ren_fn = lambda d, suff: (d.physical_id[0],
3774                                 d.physical_id[1] + "_replaced-%s" % suff)
3775       # build the rename list based on what LVs exist on the node
3776       rlist = []
3777       for to_ren in old_lvs:
3778         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3779         if find_res is not None: # device exists
3780           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3781
3782       info("renaming the old LVs on the target node")
3783       if not rpc.call_blockdev_rename(tgt_node, rlist):
3784         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3785       # now we rename the new LVs to the old LVs
3786       info("renaming the new LVs on the target node")
3787       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3788       if not rpc.call_blockdev_rename(tgt_node, rlist):
3789         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3790
3791       for old, new in zip(old_lvs, new_lvs):
3792         new.logical_id = old.logical_id
3793         cfg.SetDiskID(new, tgt_node)
3794
3795       for disk in old_lvs:
3796         disk.logical_id = ren_fn(disk, temp_suffix)
3797         cfg.SetDiskID(disk, tgt_node)
3798
3799       # now that the new lvs have the old name, we can add them to the device
3800       info("adding new mirror component on %s" % tgt_node)
3801       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3802         for new_lv in new_lvs:
3803           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3804             warning("Can't rollback device %s", hint="manually cleanup unused"
3805                     " logical volumes")
3806         raise errors.OpExecError("Can't add local storage to drbd")
3807
3808       dev.children = new_lvs
3809       cfg.Update(instance)
3810
3811     # Step: wait for sync
3812
3813     # this can fail as the old devices are degraded and _WaitForSync
3814     # does a combined result over all disks, so we don't check its
3815     # return value
3816     self.proc.LogStep(5, steps_total, "sync devices")
3817     _WaitForSync(cfg, instance, self.proc, unlock=True)
3818
3819     # so check manually all the devices
3820     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3821       cfg.SetDiskID(dev, instance.primary_node)
3822       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3823       if is_degr:
3824         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3825
3826     # Step: remove old storage
3827     self.proc.LogStep(6, steps_total, "removing old storage")
3828     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3829       info("remove logical volumes for %s" % name)
3830       for lv in old_lvs:
3831         cfg.SetDiskID(lv, tgt_node)
3832         if not rpc.call_blockdev_remove(tgt_node, lv):
3833           warning("Can't remove old LV", hint="manually remove unused LVs")
3834           continue
3835
3836   def _ExecD8Secondary(self, feedback_fn):
3837     """Replace the secondary node for drbd8.
3838
3839     The algorithm for replace is quite complicated:
3840       - for all disks of the instance:
3841         - create new LVs on the new node with same names
3842         - shutdown the drbd device on the old secondary
3843         - disconnect the drbd network on the primary
3844         - create the drbd device on the new secondary
3845         - network attach the drbd on the primary, using an artifice:
3846           the drbd code for Attach() will connect to the network if it
3847           finds a device which is connected to the good local disks but
3848           not network enabled
3849       - wait for sync across all devices
3850       - remove all disks from the old secondary
3851
3852     Failures are not very well handled.
3853
3854     """
3855     steps_total = 6
3856     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3857     instance = self.instance
3858     iv_names = {}
3859     vgname = self.cfg.GetVGName()
3860     # start of work
3861     cfg = self.cfg
3862     old_node = self.tgt_node
3863     new_node = self.new_node
3864     pri_node = instance.primary_node
3865
3866     # Step: check device activation
3867     self.proc.LogStep(1, steps_total, "check device existence")
3868     info("checking volume groups")
3869     my_vg = cfg.GetVGName()
3870     results = rpc.call_vg_list([pri_node, new_node])
3871     if not results:
3872       raise errors.OpExecError("Can't list volume groups on the nodes")
3873     for node in pri_node, new_node:
3874       res = results.get(node, False)
3875       if not res or my_vg not in res:
3876         raise errors.OpExecError("Volume group '%s' not found on %s" %
3877                                  (my_vg, node))
3878     for dev in instance.disks:
3879       if not dev.iv_name in self.op.disks:
3880         continue
3881       info("checking %s on %s" % (dev.iv_name, pri_node))
3882       cfg.SetDiskID(dev, pri_node)
3883       if not rpc.call_blockdev_find(pri_node, dev):
3884         raise errors.OpExecError("Can't find device %s on node %s" %
3885                                  (dev.iv_name, pri_node))
3886
3887     # Step: check other node consistency
3888     self.proc.LogStep(2, steps_total, "check peer consistency")
3889     for dev in instance.disks:
3890       if not dev.iv_name in self.op.disks:
3891         continue
3892       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3893       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3894         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3895                                  " unsafe to replace the secondary" %
3896                                  pri_node)
3897
3898     # Step: create new storage
3899     self.proc.LogStep(3, steps_total, "allocate new storage")
3900     for dev in instance.disks:
3901       size = dev.size
3902       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3903       # since we *always* want to create this LV, we use the
3904       # _Create...OnPrimary (which forces the creation), even if we
3905       # are talking about the secondary node
3906       for new_lv in dev.children:
3907         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3908                                         _GetInstanceInfoText(instance)):
3909           raise errors.OpExecError("Failed to create new LV named '%s' on"
3910                                    " node '%s'" %
3911                                    (new_lv.logical_id[1], new_node))
3912
3913       iv_names[dev.iv_name] = (dev, dev.children)
3914
3915     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3916     for dev in instance.disks:
3917       size = dev.size
3918       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3919       # create new devices on new_node
3920       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3921                               logical_id=(pri_node, new_node,
3922                                           dev.logical_id[2]),
3923                               children=dev.children)
3924       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3925                                         new_drbd, False,
3926                                       _GetInstanceInfoText(instance)):
3927         raise errors.OpExecError("Failed to create new DRBD on"
3928                                  " node '%s'" % new_node)
3929
3930     for dev in instance.disks:
3931       # we have new devices, shutdown the drbd on the old secondary
3932       info("shutting down drbd for %s on old node" % dev.iv_name)
3933       cfg.SetDiskID(dev, old_node)
3934       if not rpc.call_blockdev_shutdown(old_node, dev):
3935         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3936                 hint="Please cleanup this device manually as soon as possible")
3937
3938     info("detaching primary drbds from the network (=> standalone)")
3939     done = 0
3940     for dev in instance.disks:
3941       cfg.SetDiskID(dev, pri_node)
3942       # set the physical (unique in bdev terms) id to None, meaning
3943       # detach from network
3944       dev.physical_id = (None,) * len(dev.physical_id)
3945       # and 'find' the device, which will 'fix' it to match the
3946       # standalone state
3947       if rpc.call_blockdev_find(pri_node, dev):
3948         done += 1
3949       else:
3950         warning("Failed to detach drbd %s from network, unusual case" %
3951                 dev.iv_name)
3952
3953     if not done:
3954       # no detaches succeeded (very unlikely)
3955       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3956
3957     # if we managed to detach at least one, we update all the disks of
3958     # the instance to point to the new secondary
3959     info("updating instance configuration")
3960     for dev in instance.disks:
3961       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3962       cfg.SetDiskID(dev, pri_node)
3963     cfg.Update(instance)
3964
3965     # and now perform the drbd attach
3966     info("attaching primary drbds to new secondary (standalone => connected)")
3967     failures = []
3968     for dev in instance.disks:
3969       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3970       # since the attach is smart, it's enough to 'find' the device,
3971       # it will automatically activate the network, if the physical_id
3972       # is correct
3973       cfg.SetDiskID(dev, pri_node)
3974       if not rpc.call_blockdev_find(pri_node, dev):
3975         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3976                 "please do a gnt-instance info to see the status of disks")
3977
3978     # this can fail as the old devices are degraded and _WaitForSync
3979     # does a combined result over all disks, so we don't check its
3980     # return value
3981     self.proc.LogStep(5, steps_total, "sync devices")
3982     _WaitForSync(cfg, instance, self.proc, unlock=True)
3983
3984     # so check manually all the devices
3985     for name, (dev, old_lvs) in iv_names.iteritems():
3986       cfg.SetDiskID(dev, pri_node)
3987       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3988       if is_degr:
3989         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3990
3991     self.proc.LogStep(6, steps_total, "removing old storage")
3992     for name, (dev, old_lvs) in iv_names.iteritems():
3993       info("remove logical volumes for %s" % name)
3994       for lv in old_lvs:
3995         cfg.SetDiskID(lv, old_node)
3996         if not rpc.call_blockdev_remove(old_node, lv):
3997           warning("Can't remove LV on old secondary",
3998                   hint="Cleanup stale volumes by hand")
3999
4000   def Exec(self, feedback_fn):
4001     """Execute disk replacement.
4002
4003     This dispatches the disk replacement to the appropriate handler.
4004
4005     """
4006     instance = self.instance
4007     if instance.disk_template == constants.DT_REMOTE_RAID1:
4008       fn = self._ExecRR1
4009     elif instance.disk_template == constants.DT_DRBD8:
4010       if self.op.remote_node is None:
4011         fn = self._ExecD8DiskOnly
4012       else:
4013         fn = self._ExecD8Secondary
4014     else:
4015       raise errors.ProgrammerError("Unhandled disk replacement case")
4016     return fn(feedback_fn)
4017
4018
4019 class LUQueryInstanceData(NoHooksLU):
4020   """Query runtime instance data.
4021
4022   """
4023   _OP_REQP = ["instances"]
4024
4025   def CheckPrereq(self):
4026     """Check prerequisites.
4027
4028     This only checks the optional instance list against the existing names.
4029
4030     """
4031     if not isinstance(self.op.instances, list):
4032       raise errors.OpPrereqError("Invalid argument type 'instances'")
4033     if self.op.instances:
4034       self.wanted_instances = []
4035       names = self.op.instances
4036       for name in names:
4037         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4038         if instance is None:
4039           raise errors.OpPrereqError("No such instance name '%s'" % name)
4040         self.wanted_instances.append(instance)
4041     else:
4042       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4043                                in self.cfg.GetInstanceList()]
4044     return
4045
4046
4047   def _ComputeDiskStatus(self, instance, snode, dev):
4048     """Compute block device status.
4049
4050     """
4051     self.cfg.SetDiskID(dev, instance.primary_node)
4052     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4053     if dev.dev_type in constants.LDS_DRBD:
4054       # we change the snode then (otherwise we use the one passed in)
4055       if dev.logical_id[0] == instance.primary_node:
4056         snode = dev.logical_id[1]
4057       else:
4058         snode = dev.logical_id[0]
4059
4060     if snode:
4061       self.cfg.SetDiskID(dev, snode)
4062       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4063     else:
4064       dev_sstatus = None
4065
4066     if dev.children:
4067       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4068                       for child in dev.children]
4069     else:
4070       dev_children = []
4071
4072     data = {
4073       "iv_name": dev.iv_name,
4074       "dev_type": dev.dev_type,
4075       "logical_id": dev.logical_id,
4076       "physical_id": dev.physical_id,
4077       "pstatus": dev_pstatus,
4078       "sstatus": dev_sstatus,
4079       "children": dev_children,
4080       }
4081
4082     return data
4083
4084   def Exec(self, feedback_fn):
4085     """Gather and return data"""
4086     result = {}
4087     for instance in self.wanted_instances:
4088       remote_info = rpc.call_instance_info(instance.primary_node,
4089                                                 instance.name)
4090       if remote_info and "state" in remote_info:
4091         remote_state = "up"
4092       else:
4093         remote_state = "down"
4094       if instance.status == "down":
4095         config_state = "down"
4096       else:
4097         config_state = "up"
4098
4099       disks = [self._ComputeDiskStatus(instance, None, device)
4100                for device in instance.disks]
4101
4102       idict = {
4103         "name": instance.name,
4104         "config_state": config_state,
4105         "run_state": remote_state,
4106         "pnode": instance.primary_node,
4107         "snodes": instance.secondary_nodes,
4108         "os": instance.os,
4109         "memory": instance.memory,
4110         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4111         "disks": disks,
4112         "network_port": instance.network_port,
4113         "vcpus": instance.vcpus,
4114         "kernel_path": instance.kernel_path,
4115         "initrd_path": instance.initrd_path,
4116         "hvm_boot_order": instance.hvm_boot_order,
4117         }
4118
4119       result[instance.name] = idict
4120
4121     return result
4122
4123
4124 class LUSetInstanceParms(LogicalUnit):
4125   """Modifies an instances's parameters.
4126
4127   """
4128   HPATH = "instance-modify"
4129   HTYPE = constants.HTYPE_INSTANCE
4130   _OP_REQP = ["instance_name"]
4131
4132   def BuildHooksEnv(self):
4133     """Build hooks env.
4134
4135     This runs on the master, primary and secondaries.
4136
4137     """
4138     args = dict()
4139     if self.mem:
4140       args['memory'] = self.mem
4141     if self.vcpus:
4142       args['vcpus'] = self.vcpus
4143     if self.do_ip or self.do_bridge or self.mac:
4144       if self.do_ip:
4145         ip = self.ip
4146       else:
4147         ip = self.instance.nics[0].ip
4148       if self.bridge:
4149         bridge = self.bridge
4150       else:
4151         bridge = self.instance.nics[0].bridge
4152       if self.mac:
4153         mac = self.mac
4154       else:
4155         mac = self.instance.nics[0].mac
4156       args['nics'] = [(ip, bridge, mac)]
4157     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4158     nl = [self.sstore.GetMasterNode(),
4159           self.instance.primary_node] + list(self.instance.secondary_nodes)
4160     return env, nl, nl
4161
4162   def CheckPrereq(self):
4163     """Check prerequisites.
4164
4165     This only checks the instance list against the existing names.
4166
4167     """
4168     self.mem = getattr(self.op, "mem", None)
4169     self.vcpus = getattr(self.op, "vcpus", None)
4170     self.ip = getattr(self.op, "ip", None)
4171     self.mac = getattr(self.op, "mac", None)
4172     self.bridge = getattr(self.op, "bridge", None)
4173     self.kernel_path = getattr(self.op, "kernel_path", None)
4174     self.initrd_path = getattr(self.op, "initrd_path", None)
4175     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4176     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4177                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4178     if all_parms.count(None) == len(all_parms):
4179       raise errors.OpPrereqError("No changes submitted")
4180     if self.mem is not None:
4181       try:
4182         self.mem = int(self.mem)
4183       except ValueError, err:
4184         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4185     if self.vcpus is not None:
4186       try:
4187         self.vcpus = int(self.vcpus)
4188       except ValueError, err:
4189         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4190     if self.ip is not None:
4191       self.do_ip = True
4192       if self.ip.lower() == "none":
4193         self.ip = None
4194       else:
4195         if not utils.IsValidIP(self.ip):
4196           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4197     else:
4198       self.do_ip = False
4199     self.do_bridge = (self.bridge is not None)
4200     if self.mac is not None:
4201       if self.cfg.IsMacInUse(self.mac):
4202         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4203                                    self.mac)
4204       if not utils.IsValidMac(self.mac):
4205         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4206
4207     if self.kernel_path is not None:
4208       self.do_kernel_path = True
4209       if self.kernel_path == constants.VALUE_NONE:
4210         raise errors.OpPrereqError("Can't set instance to no kernel")
4211
4212       if self.kernel_path != constants.VALUE_DEFAULT:
4213         if not os.path.isabs(self.kernel_path):
4214           raise errors.OpPrereqError("The kernel path must be an absolute"
4215                                     " filename")
4216     else:
4217       self.do_kernel_path = False
4218
4219     if self.initrd_path is not None:
4220       self.do_initrd_path = True
4221       if self.initrd_path not in (constants.VALUE_NONE,
4222                                   constants.VALUE_DEFAULT):
4223         if not os.path.isabs(self.initrd_path):
4224           raise errors.OpPrereqError("The initrd path must be an absolute"
4225                                     " filename")
4226     else:
4227       self.do_initrd_path = False
4228
4229     # boot order verification
4230     if self.hvm_boot_order is not None:
4231       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4232         if len(self.hvm_boot_order.strip("acdn")) != 0:
4233           raise errors.OpPrereqError("invalid boot order specified,"
4234                                      " must be one or more of [acdn]"
4235                                      " or 'default'")
4236
4237     instance = self.cfg.GetInstanceInfo(
4238       self.cfg.ExpandInstanceName(self.op.instance_name))
4239     if instance is None:
4240       raise errors.OpPrereqError("No such instance name '%s'" %
4241                                  self.op.instance_name)
4242     self.op.instance_name = instance.name
4243     self.instance = instance
4244     return
4245
4246   def Exec(self, feedback_fn):
4247     """Modifies an instance.
4248
4249     All parameters take effect only at the next restart of the instance.
4250     """
4251     result = []
4252     instance = self.instance
4253     if self.mem:
4254       instance.memory = self.mem
4255       result.append(("mem", self.mem))
4256     if self.vcpus:
4257       instance.vcpus = self.vcpus
4258       result.append(("vcpus",  self.vcpus))
4259     if self.do_ip:
4260       instance.nics[0].ip = self.ip
4261       result.append(("ip", self.ip))
4262     if self.bridge:
4263       instance.nics[0].bridge = self.bridge
4264       result.append(("bridge", self.bridge))
4265     if self.mac:
4266       instance.nics[0].mac = self.mac
4267       result.append(("mac", self.mac))
4268     if self.do_kernel_path:
4269       instance.kernel_path = self.kernel_path
4270       result.append(("kernel_path", self.kernel_path))
4271     if self.do_initrd_path:
4272       instance.initrd_path = self.initrd_path
4273       result.append(("initrd_path", self.initrd_path))
4274     if self.hvm_boot_order:
4275       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4276         instance.hvm_boot_order = None
4277       else:
4278         instance.hvm_boot_order = self.hvm_boot_order
4279       result.append(("hvm_boot_order", self.hvm_boot_order))
4280
4281     self.cfg.AddInstance(instance)
4282
4283     return result
4284
4285
4286 class LUQueryExports(NoHooksLU):
4287   """Query the exports list
4288
4289   """
4290   _OP_REQP = []
4291
4292   def CheckPrereq(self):
4293     """Check that the nodelist contains only existing nodes.
4294
4295     """
4296     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4297
4298   def Exec(self, feedback_fn):
4299     """Compute the list of all the exported system images.
4300
4301     Returns:
4302       a dictionary with the structure node->(export-list)
4303       where export-list is a list of the instances exported on
4304       that node.
4305
4306     """
4307     return rpc.call_export_list(self.nodes)
4308
4309
4310 class LUExportInstance(LogicalUnit):
4311   """Export an instance to an image in the cluster.
4312
4313   """
4314   HPATH = "instance-export"
4315   HTYPE = constants.HTYPE_INSTANCE
4316   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4317
4318   def BuildHooksEnv(self):
4319     """Build hooks env.
4320
4321     This will run on the master, primary node and target node.
4322
4323     """
4324     env = {
4325       "EXPORT_NODE": self.op.target_node,
4326       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4327       }
4328     env.update(_BuildInstanceHookEnvByObject(self.instance))
4329     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4330           self.op.target_node]
4331     return env, nl, nl
4332
4333   def CheckPrereq(self):
4334     """Check prerequisites.
4335
4336     This checks that the instance name is a valid one.
4337
4338     """
4339     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4340     self.instance = self.cfg.GetInstanceInfo(instance_name)
4341     if self.instance is None:
4342       raise errors.OpPrereqError("Instance '%s' not found" %
4343                                  self.op.instance_name)
4344
4345     # node verification
4346     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4347     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4348
4349     if self.dst_node is None:
4350       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4351                                  self.op.target_node)
4352     self.op.target_node = self.dst_node.name
4353
4354   def Exec(self, feedback_fn):
4355     """Export an instance to an image in the cluster.
4356
4357     """
4358     instance = self.instance
4359     dst_node = self.dst_node
4360     src_node = instance.primary_node
4361     # shutdown the instance, unless requested not to do so
4362     if self.op.shutdown:
4363       op = opcodes.OpShutdownInstance(instance_name=instance.name)
4364       self.proc.ChainOpCode(op)
4365
4366     vgname = self.cfg.GetVGName()
4367
4368     snap_disks = []
4369
4370     try:
4371       for disk in instance.disks:
4372         if disk.iv_name == "sda":
4373           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4374           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4375
4376           if not new_dev_name:
4377             logger.Error("could not snapshot block device %s on node %s" %
4378                          (disk.logical_id[1], src_node))
4379           else:
4380             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4381                                       logical_id=(vgname, new_dev_name),
4382                                       physical_id=(vgname, new_dev_name),
4383                                       iv_name=disk.iv_name)
4384             snap_disks.append(new_dev)
4385
4386     finally:
4387       if self.op.shutdown:
4388         op = opcodes.OpStartupInstance(instance_name=instance.name,
4389                                        force=False)
4390         self.proc.ChainOpCode(op)
4391
4392     # TODO: check for size
4393
4394     for dev in snap_disks:
4395       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4396                                            instance):
4397         logger.Error("could not export block device %s from node"
4398                      " %s to node %s" %
4399                      (dev.logical_id[1], src_node, dst_node.name))
4400       if not rpc.call_blockdev_remove(src_node, dev):
4401         logger.Error("could not remove snapshot block device %s from"
4402                      " node %s" % (dev.logical_id[1], src_node))
4403
4404     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4405       logger.Error("could not finalize export for instance %s on node %s" %
4406                    (instance.name, dst_node.name))
4407
4408     nodelist = self.cfg.GetNodeList()
4409     nodelist.remove(dst_node.name)
4410
4411     # on one-node clusters nodelist will be empty after the removal
4412     # if we proceed the backup would be removed because OpQueryExports
4413     # substitutes an empty list with the full cluster node list.
4414     if nodelist:
4415       op = opcodes.OpQueryExports(nodes=nodelist)
4416       exportlist = self.proc.ChainOpCode(op)
4417       for node in exportlist:
4418         if instance.name in exportlist[node]:
4419           if not rpc.call_export_remove(node, instance.name):
4420             logger.Error("could not remove older export for instance %s"
4421                          " on node %s" % (instance.name, node))
4422
4423
4424 class TagsLU(NoHooksLU):
4425   """Generic tags LU.
4426
4427   This is an abstract class which is the parent of all the other tags LUs.
4428
4429   """
4430   def CheckPrereq(self):
4431     """Check prerequisites.
4432
4433     """
4434     if self.op.kind == constants.TAG_CLUSTER:
4435       self.target = self.cfg.GetClusterInfo()
4436     elif self.op.kind == constants.TAG_NODE:
4437       name = self.cfg.ExpandNodeName(self.op.name)
4438       if name is None:
4439         raise errors.OpPrereqError("Invalid node name (%s)" %
4440                                    (self.op.name,))
4441       self.op.name = name
4442       self.target = self.cfg.GetNodeInfo(name)
4443     elif self.op.kind == constants.TAG_INSTANCE:
4444       name = self.cfg.ExpandInstanceName(self.op.name)
4445       if name is None:
4446         raise errors.OpPrereqError("Invalid instance name (%s)" %
4447                                    (self.op.name,))
4448       self.op.name = name
4449       self.target = self.cfg.GetInstanceInfo(name)
4450     else:
4451       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4452                                  str(self.op.kind))
4453
4454
4455 class LUGetTags(TagsLU):
4456   """Returns the tags of a given object.
4457
4458   """
4459   _OP_REQP = ["kind", "name"]
4460
4461   def Exec(self, feedback_fn):
4462     """Returns the tag list.
4463
4464     """
4465     return self.target.GetTags()
4466
4467
4468 class LUSearchTags(NoHooksLU):
4469   """Searches the tags for a given pattern.
4470
4471   """
4472   _OP_REQP = ["pattern"]
4473
4474   def CheckPrereq(self):
4475     """Check prerequisites.
4476
4477     This checks the pattern passed for validity by compiling it.
4478
4479     """
4480     try:
4481       self.re = re.compile(self.op.pattern)
4482     except re.error, err:
4483       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4484                                  (self.op.pattern, err))
4485
4486   def Exec(self, feedback_fn):
4487     """Returns the tag list.
4488
4489     """
4490     cfg = self.cfg
4491     tgts = [("/cluster", cfg.GetClusterInfo())]
4492     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4493     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4494     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4495     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4496     results = []
4497     for path, target in tgts:
4498       for tag in target.GetTags():
4499         if self.re.search(tag):
4500           results.append((path, tag))
4501     return results
4502
4503
4504 class LUAddTags(TagsLU):
4505   """Sets a tag on a given object.
4506
4507   """
4508   _OP_REQP = ["kind", "name", "tags"]
4509
4510   def CheckPrereq(self):
4511     """Check prerequisites.
4512
4513     This checks the type and length of the tag name and value.
4514
4515     """
4516     TagsLU.CheckPrereq(self)
4517     for tag in self.op.tags:
4518       objects.TaggableObject.ValidateTag(tag)
4519
4520   def Exec(self, feedback_fn):
4521     """Sets the tag.
4522
4523     """
4524     try:
4525       for tag in self.op.tags:
4526         self.target.AddTag(tag)
4527     except errors.TagError, err:
4528       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4529     try:
4530       self.cfg.Update(self.target)
4531     except errors.ConfigurationError:
4532       raise errors.OpRetryError("There has been a modification to the"
4533                                 " config file and the operation has been"
4534                                 " aborted. Please retry.")
4535
4536
4537 class LUDelTags(TagsLU):
4538   """Delete a list of tags from a given object.
4539
4540   """
4541   _OP_REQP = ["kind", "name", "tags"]
4542
4543   def CheckPrereq(self):
4544     """Check prerequisites.
4545
4546     This checks that we have the given tag.
4547
4548     """
4549     TagsLU.CheckPrereq(self)
4550     for tag in self.op.tags:
4551       objects.TaggableObject.ValidateTag(tag)
4552     del_tags = frozenset(self.op.tags)
4553     cur_tags = self.target.GetTags()
4554     if not del_tags <= cur_tags:
4555       diff_tags = del_tags - cur_tags
4556       diff_names = ["'%s'" % tag for tag in diff_tags]
4557       diff_names.sort()
4558       raise errors.OpPrereqError("Tag(s) %s not found" %
4559                                  (",".join(diff_names)))
4560
4561   def Exec(self, feedback_fn):
4562     """Remove the tag from the object.
4563
4564     """
4565     for tag in self.op.tags:
4566       self.target.RemoveTag(tag)
4567     try:
4568       self.cfg.Update(self.target)
4569     except errors.ConfigurationError:
4570       raise errors.OpRetryError("There has been a modification to the"
4571                                 " config file and the operation has been"
4572                                 " aborted. Please retry.")
4573
4574 class LUTestDelay(NoHooksLU):
4575   """Sleep for a specified amount of time.
4576
4577   This LU sleeps on the master and/or nodes for a specified amoutn of
4578   time.
4579
4580   """
4581   _OP_REQP = ["duration", "on_master", "on_nodes"]
4582
4583   def CheckPrereq(self):
4584     """Check prerequisites.
4585
4586     This checks that we have a good list of nodes and/or the duration
4587     is valid.
4588
4589     """
4590
4591     if self.op.on_nodes:
4592       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4593
4594   def Exec(self, feedback_fn):
4595     """Do the actual sleep.
4596
4597     """
4598     if self.op.on_master:
4599       if not utils.TestDelay(self.op.duration):
4600         raise errors.OpExecError("Error during master delay test")
4601     if self.op.on_nodes:
4602       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4603       if not result:
4604         raise errors.OpExecError("Complete failure from rpc call")
4605       for node, node_result in result.items():
4606         if not node_result:
4607           raise errors.OpExecError("Failure during rpc call to node %s,"
4608                                    " result: %s" % (node, node_result))