4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script to show add a new node to the cluster
26 # pylint: disable-msg=C0103
30 from twisted.internet.pollreactor import PollReactor
32 class ReReactor(PollReactor):
33 """A re-startable Reactor implementation.
36 def run(self, installSignalHandlers=1):
39 This is customized run that, before calling Reactor.run, will
40 reinstall the shutdown events and re-create the threadpool in case
41 these are not present (as will happen on the second run of the
45 if not 'shutdown' in self._eventTriggers:
46 # the shutdown queue has been killed, we are most probably
47 # at the second run, thus recreate the queue
48 self.addSystemEventTrigger('during', 'shutdown', self.crash)
49 self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
50 if self.threadpool is not None and self.threadpool.joined == 1:
51 # in case the threadpool has been stopped, re-start it
52 # and add a trigger to stop it at reactor shutdown
53 self.threadpool.start()
54 self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
56 return PollReactor.run(self, installSignalHandlers)
59 import twisted.internet.main
60 twisted.internet.main.installReactor(ReReactor())
62 from twisted.spread import pb
63 from twisted.internet import reactor
64 from twisted.cred import credentials
65 from OpenSSL import SSL, crypto
67 from ganeti import logger
68 from ganeti import utils
69 from ganeti import errors
70 from ganeti import constants
71 from ganeti import objects
72 from ganeti import ssconf
75 """Node-handling class.
77 For each node that we speak with, we create an instance of this
78 class, so that we have a safe place to store the details of this
82 def __init__(self, parent, node):
87 """Stop the reactor if we got all the results.
90 if len(self.parent.results) == len(self.parent.nc):
93 def cb_call(self, obj):
94 """Callback for successful connect.
96 If the connect and login sequence succeeded, we proceed with
97 making the actual call.
100 deferred = obj.callRemote(self.parent.procedure, self.parent.args)
101 deferred.addCallbacks(self.cb_done, self.cb_err2)
103 def cb_done(self, result):
104 """Callback for successful call.
106 When we receive the result from a call, we check if it was an
107 error and if so we raise a generic RemoteError (we can't pass yet
108 the actual exception over). If there was no error, we store the
112 tb, self.parent.results[self.node] = result
115 raise errors.RemoteError("Remote procedure error calling %s on %s:"
116 "\n%s" % (self.parent.procedure,
120 def cb_err1(self, reason):
121 """Error callback for unsuccessful connect.
124 logger.Error("caller_connect: could not connect to remote host %s,"
125 " reason %s" % (self.node, reason))
126 self.parent.results[self.node] = False
129 def cb_err2(self, reason):
130 """Error callback for unsuccessful call.
132 This is when the call didn't return anything, not even an error,
133 or when it time out, etc.
136 logger.Error("caller_call: could not call %s on node %s,"
137 " reason %s" % (self.parent.procedure, self.node, reason))
138 self.parent.results[self.node] = False
142 class MirrorContextFactory:
143 """Certificate verifier factory.
145 This factory creates contexts that verify if the remote end has a
146 specific certificate (i.e. our own certificate).
148 The checks we do are that the PEM dump of the certificate is the
149 same as our own and (somewhat redundantly) that the SHA checksum is
157 fd = open(constants.SSL_CERT_FILE, 'r')
159 data = fd.read(16384)
162 except EnvironmentError, err:
163 raise errors.ConfigurationError("missing SSL certificate: %s" %
165 self.mycert = crypto.load_certificate(crypto.FILETYPE_PEM, data)
166 self.mypem = crypto.dump_certificate(crypto.FILETYPE_PEM, self.mycert)
167 self.mydigest = self.mycert.digest('SHA')
169 def verifier(self, conn, x509, errno, err_depth, retcode):
170 """Certificate verify method.
173 if self.mydigest != x509.digest('SHA'):
175 if crypto.dump_certificate(crypto.FILETYPE_PEM, x509) != self.mypem:
179 def getContext(self):
180 """Context generator.
183 context = SSL.Context(SSL.TLSv1_METHOD)
184 context.set_verify(SSL.VERIFY_PEER, self.verifier)
190 This class, given a (remote) ethod name, a list of parameters and a
191 list of nodes, will contact (in parallel) all nodes, and return a
192 dict of results (key: node name, value: result).
194 One current bug is that generic failure is still signalled by
195 'False' result, which is not good. This overloading of values can
203 def __init__(self, procedure, args):
204 ss = ssconf.SimpleStore()
205 self.port = ss.GetNodeDaemonPort()
206 self.nodepw = ss.GetNodeDaemonPassword()
209 self.procedure = procedure
212 #--- generic connector -------------
214 def connect_list(self, node_list):
215 """Add a list of nodes to the target nodes.
218 for node in node_list:
221 def connect(self, connect_node):
222 """Add a node to the target list.
225 factory = pb.PBClientFactory()
226 self.nc[connect_node] = nc = NodeController(self, connect_node)
227 reactor.connectSSL(connect_node, self.port, factory,
228 MirrorContextFactory())
229 #d = factory.getRootObject()
230 d = factory.login(credentials.UsernamePassword("master_node", self.nodepw))
231 d.addCallbacks(nc.cb_call, nc.cb_err1)
234 """Return the results of the call.
240 """Wrapper over reactor.run().
242 This function simply calls reactor.run() if we have any requests
243 queued, otherwise it does nothing.
250 def call_volume_list(node_list, vg_name):
251 """Gets the logical volumes present in a given volume group.
253 This is a multi-node call.
256 c = Client("volume_list", [vg_name])
257 c.connect_list(node_list)
262 def call_vg_list(node_list):
263 """Gets the volume group list.
265 This is a multi-node call.
268 c = Client("vg_list", [])
269 c.connect_list(node_list)
274 def call_bridges_exist(node, bridges_list):
275 """Checks if a node has all the bridges given.
277 This method checks if all bridges given in the bridges_list are
278 present on the remote node, so that an instance that uses interfaces
279 on those bridges can be started.
281 This is a single-node call.
284 c = Client("bridges_exist", [bridges_list])
287 return c.getresult().get(node, False)
290 def call_instance_start(node, instance, extra_args):
291 """Stars an instance.
293 This is a single-node call.
296 c = Client("instance_start", [instance.ToDict(), extra_args])
299 return c.getresult().get(node, False)
302 def call_instance_shutdown(node, instance):
303 """Stops an instance.
305 This is a single-node call.
308 c = Client("instance_shutdown", [instance.ToDict()])
311 return c.getresult().get(node, False)
314 def call_instance_os_add(node, inst, osdev, swapdev):
315 """Installs an OS on the given instance.
317 This is a single-node call.
320 params = [inst.ToDict(), osdev, swapdev]
321 c = Client("instance_os_add", params)
324 return c.getresult().get(node, False)
327 def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
328 """Run the OS rename script for an instance.
330 This is a single-node call.
333 params = [inst.ToDict(), old_name, osdev, swapdev]
334 c = Client("instance_run_rename", params)
337 return c.getresult().get(node, False)
340 def call_instance_info(node, instance):
341 """Returns information about a single instance.
343 This is a single-node call.
346 c = Client("instance_info", [instance])
349 return c.getresult().get(node, False)
352 def call_all_instances_info(node_list):
353 """Returns information about all instances on a given node.
355 This is a single-node call.
358 c = Client("all_instances_info", [])
359 c.connect_list(node_list)
364 def call_instance_list(node_list):
365 """Returns the list of running instances on a given node.
367 This is a single-node call.
370 c = Client("instance_list", [])
371 c.connect_list(node_list)
376 def call_node_info(node_list, vg_name):
377 """Return node information.
379 This will return memory information and volume group size and free
382 This is a multi-node call.
385 c = Client("node_info", [vg_name])
386 c.connect_list(node_list)
388 retux = c.getresult()
390 for node_name in retux:
391 ret = retux.get(node_name, False)
392 if type(ret) != dict:
393 logger.Error("could not connect to node %s" % (node_name))
397 { 'memory_total' : '-',
400 'vg_size' : 'node_unreachable',
407 def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
408 """Add a node to the cluster.
410 This is a single-node call.
413 params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
414 c = Client("node_add", params)
417 return c.getresult().get(node, False)
420 def call_node_verify(node_list, checkdict):
421 """Request verification of given parameters.
423 This is a multi-node call.
426 c = Client("node_verify", [checkdict])
427 c.connect_list(node_list)
432 def call_node_start_master(node):
433 """Tells a node to activate itself as a master.
435 This is a single-node call.
438 c = Client("node_start_master", [])
441 return c.getresult().get(node, False)
444 def call_node_stop_master(node):
445 """Tells a node to demote itself from master status.
447 This is a single-node call.
450 c = Client("node_stop_master", [])
453 return c.getresult().get(node, False)
456 def call_version(node_list):
457 """Query node version.
459 This is a multi-node call.
462 c = Client("version", [])
463 c.connect_list(node_list)
468 def call_blockdev_create(node, bdev, size, on_primary, info):
469 """Request creation of a given block device.
471 This is a single-node call.
474 params = [bdev.ToDict(), size, on_primary, info]
475 c = Client("blockdev_create", params)
478 return c.getresult().get(node, False)
481 def call_blockdev_remove(node, bdev):
482 """Request removal of a given block device.
484 This is a single-node call.
487 c = Client("blockdev_remove", [bdev.ToDict()])
490 return c.getresult().get(node, False)
493 def call_blockdev_assemble(node, disk, on_primary):
494 """Request assembling of a given block device.
496 This is a single-node call.
499 params = [disk.ToDict(), on_primary]
500 c = Client("blockdev_assemble", params)
503 return c.getresult().get(node, False)
506 def call_blockdev_shutdown(node, disk):
507 """Request shutdown of a given block device.
509 This is a single-node call.
512 c = Client("blockdev_shutdown", [disk.ToDict()])
515 return c.getresult().get(node, False)
518 def call_blockdev_addchild(node, bdev, ndev):
519 """Request adding a new child to a (mirroring) device.
521 This is a single-node call.
524 params = [bdev.ToDict(), ndev.ToDict()]
525 c = Client("blockdev_addchild", params)
528 return c.getresult().get(node, False)
531 def call_blockdev_removechild(node, bdev, ndev):
532 """Request removing a new child from a (mirroring) device.
534 This is a single-node call.
537 params = [bdev.ToDict(), ndev.ToDict()]
538 c = Client("blockdev_removechild", params)
541 return c.getresult().get(node, False)
544 def call_blockdev_getmirrorstatus(node, disks):
545 """Request status of a (mirroring) device.
547 This is a single-node call.
550 params = [dsk.ToDict() for dsk in disks]
551 c = Client("blockdev_getmirrorstatus", params)
554 return c.getresult().get(node, False)
557 def call_blockdev_find(node, disk):
558 """Request identification of a given block device.
560 This is a single-node call.
563 c = Client("blockdev_find", [disk.ToDict()])
566 return c.getresult().get(node, False)
569 def call_upload_file(node_list, file_name):
572 The node will refuse the operation in case the file is not on the
575 This is a multi-node call.
583 st = os.stat(file_name)
584 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
585 st.st_atime, st.st_mtime]
586 c = Client("upload_file", params)
587 c.connect_list(node_list)
592 def call_os_diagnose(node_list):
593 """Request a diagnose of OS definitions.
595 This is a multi-node call.
598 c = Client("os_diagnose", [])
599 c.connect_list(node_list)
601 result = c.getresult()
603 for node_name in result:
605 if result[node_name]:
606 for data in result[node_name]:
608 if isinstance(data, dict):
609 nr.append(objects.OS.FromDict(data))
610 elif isinstance(data, tuple) and len(data) == 3:
611 nr.append(errors.InvalidOS(data[0], data[1], data[2]))
613 raise errors.ProgrammerError("Invalid data from"
614 " xcserver.os_diagnose")
615 new_result[node_name] = nr
619 def call_os_get(node_list, name):
620 """Returns an OS definition.
622 This is a multi-node call.
625 c = Client("os_get", [name])
626 c.connect_list(node_list)
628 result = c.getresult()
630 for node_name in result:
631 data = result[node_name]
632 if isinstance(data, dict):
633 new_result[node_name] = objects.OS.FromDict(data)
634 elif isinstance(data, tuple) and len(data) == 3:
635 new_result[node_name] = errors.InvalidOS(data[0], data[1], data[2])
637 new_result[node_name] = data
641 def call_hooks_runner(node_list, hpath, phase, env):
642 """Call the hooks runner.
645 - op: the OpCode instance
646 - env: a dictionary with the environment
648 This is a multi-node call.
651 params = [hpath, phase, env]
652 c = Client("hooks_runner", params)
653 c.connect_list(node_list)
655 result = c.getresult()
659 def call_blockdev_snapshot(node, cf_bdev):
660 """Request a snapshot of the given block device.
662 This is a single-node call.
665 c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
668 return c.getresult().get(node, False)
671 def call_snapshot_export(node, snap_bdev, dest_node, instance):
672 """Request the export of a given snapshot.
674 This is a single-node call.
677 params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
678 c = Client("snapshot_export", params)
681 return c.getresult().get(node, False)
684 def call_finalize_export(node, instance, snap_disks):
685 """Request the completion of an export operation.
687 This writes the export config file, etc.
689 This is a single-node call.
693 for disk in snap_disks:
694 flat_disks.append(disk.ToDict())
695 params = [instance.ToDict(), flat_disks]
696 c = Client("finalize_export", params)
699 return c.getresult().get(node, False)
702 def call_export_info(node, path):
703 """Queries the export information in a given path.
705 This is a single-node call.
708 c = Client("export_info", [path])
711 result = c.getresult().get(node, False)
714 return objects.SerializableConfigParser.Loads(result)
717 def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
718 """Request the import of a backup into an instance.
720 This is a single-node call.
723 params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
724 c = Client("instance_os_import", params)
727 return c.getresult().get(node, False)
730 def call_export_list(node_list):
731 """Gets the stored exports list.
733 This is a multi-node call.
736 c = Client("export_list", [])
737 c.connect_list(node_list)
739 result = c.getresult()
743 def call_export_remove(node, export):
744 """Requests removal of a given export.
746 This is a single-node call.
749 c = Client("export_remove", [export])
752 return c.getresult().get(node, False)
755 def call_node_leave_cluster(node):
756 """Requests a node to clean the cluster information it has.
758 This will remove the configuration information from the ganeti data
761 This is a single-node call.
764 c = Client("node_leave_cluster", [])
767 return c.getresult().get(node, False)
770 def call_node_volumes(node_list):
771 """Gets all volumes on node(s).
773 This is a multi-node call.
776 c = Client("node_volumes", [])
777 c.connect_list(node_list)