Style changes for pep-8 and python-3000 compliance.
[ganeti-local] / lib / rpc.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script to show add a new node to the cluster
23
24 """
25
26 # pylint: disable-msg=C0103
27
28 import os
29
30 from twisted.internet.pollreactor import PollReactor
31
32 class ReReactor(PollReactor):
33   """A re-startable Reactor implementation.
34
35   """
36   def run(self, installSignalHandlers=1):
37     """Custom run method.
38
39     This is customized run that, before calling Reactor.run, will
40     reinstall the shutdown events and re-create the threadpool in case
41     these are not present (as will happen on the second run of the
42     reactor).
43
44     """
45     if not 'shutdown' in self._eventTriggers:
46       # the shutdown queue has been killed, we are most probably
47       # at the second run, thus recreate the queue
48       self.addSystemEventTrigger('during', 'shutdown', self.crash)
49       self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
50     if self.threadpool is not None and self.threadpool.joined == 1:
51       # in case the threadpool has been stopped, re-start it
52       # and add a trigger to stop it at reactor shutdown
53       self.threadpool.start()
54       self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
55
56     return PollReactor.run(self, installSignalHandlers)
57
58
59 import twisted.internet.main
60 twisted.internet.main.installReactor(ReReactor())
61
62 from twisted.spread import pb
63 from twisted.internet import reactor
64 from twisted.cred import credentials
65 from OpenSSL import SSL, crypto
66
67 from ganeti import logger
68 from ganeti import utils
69 from ganeti import errors
70 from ganeti import constants
71 from ganeti import objects
72 from ganeti import ssconf
73
74 class NodeController:
75   """Node-handling class.
76
77   For each node that we speak with, we create an instance of this
78   class, so that we have a safe place to store the details of this
79   individual call.
80
81   """
82   def __init__(self, parent, node):
83     self.parent = parent
84     self.node = node
85
86   def _check_end(self):
87     """Stop the reactor if we got all the results.
88
89     """
90     if len(self.parent.results) == len(self.parent.nc):
91       reactor.stop()
92
93   def cb_call(self, obj):
94     """Callback for successfull connect.
95
96     If the connect and login sequence succeeded, we proceed with
97     making the actual call.
98
99     """
100     deferred = obj.callRemote(self.parent.procedure, self.parent.args)
101     deferred.addCallbacks(self.cb_done, self.cb_err2)
102
103   def cb_done(self, result):
104     """Callback for successful call.
105
106     When we receive the result from a call, we check if it was an
107     error and if so we raise a generic RemoteError (we can't pass yet
108     the actual exception over). If there was no error, we store the
109     result.
110
111     """
112     tb, self.parent.results[self.node] = result
113     self._check_end()
114     if tb:
115       raise errors.RemoteError("Remote procedure error calling %s on %s:"
116                                "\n%s" % (self.parent.procedure,
117                                          self.node,
118                                          tb))
119
120   def cb_err1(self, reason):
121     """Error callback for unsuccessful connect.
122
123     """
124     logger.Error("caller_connect: could not connect to remote host %s,"
125                  " reason %s" % (self.node, reason))
126     self.parent.results[self.node] = False
127     self._check_end()
128
129   def cb_err2(self, reason):
130     """Error callback for unsuccessful call.
131
132     This is when the call didn't return anything, not even an error,
133     or when it time out, etc.
134
135     """
136     logger.Error("caller_call: could not call %s on node %s,"
137                  " reason %s" % (self.parent.procedure, self.node, reason))
138     self.parent.results[self.node] = False
139     self._check_end()
140
141
142 class MirrorContextFactory:
143   """Certificate verifier factory.
144
145   This factory creates contexts that verify if the remote end has a
146   specific certificate (i.e. our own certificate).
147
148   The checks we do are that the PEM dump of the certificate is the
149   same as our own and (somewhat redundantly) that the SHA checksum is
150   the same.
151
152   """
153   isClient = 1
154
155   def __init__(self):
156     try:
157       fd = open(constants.SSL_CERT_FILE, 'r')
158       try:
159         data = fd.read(16384)
160       finally:
161         fd.close()
162     except EnvironmentError, err:
163       raise errors.ConfigurationError("missing SSL certificate: %s" %
164                                       str(err))
165     self.mycert = crypto.load_certificate(crypto.FILETYPE_PEM, data)
166     self.mypem = crypto.dump_certificate(crypto.FILETYPE_PEM, self.mycert)
167     self.mydigest = self.mycert.digest('SHA')
168
169   def verifier(self, conn, x509, errno, err_depth, retcode):
170     """Certificate verify method.
171
172     """
173     if self.mydigest != x509.digest('SHA'):
174       return False
175     if crypto.dump_certificate(crypto.FILETYPE_PEM, x509) != self.mypem:
176       return False
177     return True
178
179   def getContext(self):
180     """Context generator.
181
182     """
183     context = SSL.Context(SSL.TLSv1_METHOD)
184     context.set_verify(SSL.VERIFY_PEER, self.verifier)
185     return context
186
187 class Client:
188   """RPC Client class.
189
190   This class, given a (remote) ethod name, a list of parameters and a
191   list of nodes, will contact (in parallel) all nodes, and return a
192   dict of results (key: node name, value: result).
193
194   One current bug is that generic failure is still signalled by
195   'False' result, which is not good. This overloading of values can
196   cause bugs.
197
198   """
199   result_set = False
200   result = False
201   allresult = []
202
203   def __init__(self, procedure, args):
204     ss = ssconf.SimpleStore()
205     self.port = ss.GetNodeDaemonPort()
206     self.nodepw = ss.GetNodeDaemonPassword()
207     self.nc = {}
208     self.results = {}
209     self.procedure = procedure
210     self.args = args
211
212   #--- generic connector -------------
213
214   def connect_list(self, node_list):
215     """Add a list of nodes to the target nodes.
216
217     """
218     for node in node_list:
219       self.connect(node)
220
221   def connect(self, connect_node):
222     """Add a node to the target list.
223
224     """
225     factory = pb.PBClientFactory()
226     self.nc[connect_node] = nc = NodeController(self, connect_node)
227     reactor.connectSSL(connect_node, self.port, factory,
228                        MirrorContextFactory())
229     #d = factory.getRootObject()
230     d = factory.login(credentials.UsernamePassword("master_node", self.nodepw))
231     d.addCallbacks(nc.cb_call, nc.cb_err1)
232
233   def getresult(self):
234     """Return the results of the call.
235
236     """
237     return self.results
238
239   def run(self):
240     """Wrapper over reactor.run().
241
242     This function simply calls reactor.run() if we have any requests
243     queued, otherwise it does nothing.
244
245     """
246     if self.nc:
247       reactor.run()
248
249
250 def call_volume_list(node_list, vg_name):
251   """Gets the logical volumes present in a given volume group.
252
253   This is a multi-node call.
254
255   """
256   c = Client("volume_list", [vg_name])
257   c.connect_list(node_list)
258   c.run()
259   return c.getresult()
260
261
262 def call_vg_list(node_list):
263   """Gets the volume group list.
264
265   This is a multi-node call.
266
267   """
268   c = Client("vg_list", [])
269   c.connect_list(node_list)
270   c.run()
271   return c.getresult()
272
273
274 def call_bridges_exist(node, bridges_list):
275   """Checks if a node has all the bridges given.
276
277   This method checks if all bridges given in the bridges_list are
278   present on the remote node, so that an instance that uses interfaces
279   on those bridges can be started.
280
281   This is a single-node call.
282
283   """
284   c = Client("bridges_exist", [bridges_list])
285   c.connect(node)
286   c.run()
287   return c.getresult().get(node, False)
288
289
290 def call_instance_start(node, instance, extra_args):
291   """Stars an instance.
292
293   This is a single-node call.
294
295   """
296   c = Client("instance_start", [instance.Dumps(), extra_args])
297   c.connect(node)
298   c.run()
299   return c.getresult().get(node, False)
300
301
302 def call_instance_shutdown(node, instance):
303   """Stops an instance.
304
305   This is a single-node call.
306
307   """
308   c = Client("instance_shutdown", [instance.Dumps()])
309   c.connect(node)
310   c.run()
311   return c.getresult().get(node, False)
312
313
314 def call_instance_os_add(node, inst, osdev, swapdev):
315   """Installs an OS on the given instance.
316
317   This is a single-node call.
318
319   """
320   params = [inst.Dumps(), osdev, swapdev]
321   c = Client("instance_os_add", params)
322   c.connect(node)
323   c.run()
324   return c.getresult().get(node, False)
325
326
327 def call_instance_info(node, instance):
328   """Returns information about a single instance.
329
330   This is a single-node call.
331
332   """
333   c = Client("instance_info", [instance])
334   c.connect(node)
335   c.run()
336   return c.getresult().get(node, False)
337
338
339 def call_all_instances_info(node_list):
340   """Returns information about all instances on a given node.
341
342   This is a single-node call.
343
344   """
345   c = Client("all_instances_info", [])
346   c.connect_list(node_list)
347   c.run()
348   return c.getresult()
349
350
351 def call_instance_list(node_list):
352   """Returns the list of running instances on a given node.
353
354   This is a single-node call.
355
356   """
357   c = Client("instance_list", [])
358   c.connect_list(node_list)
359   c.run()
360   return c.getresult()
361
362
363 def call_node_info(node_list, vg_name):
364   """Return node information.
365
366   This will return memory information and volume group size and free
367   space.
368
369   This is a multi-node call.
370
371   """
372   c = Client("node_info", [vg_name])
373   c.connect_list(node_list)
374   c.run()
375   retux = c.getresult()
376
377   for node_name in retux:
378     ret = retux.get(node_name, False)
379     if type(ret) != dict:
380       logger.Error("could not connect to node %s" % (node_name))
381       ret = {}
382
383     utils.CheckDict(ret,
384                     { 'memory_total' : '-',
385                       'memory_dom0' : '-',
386                       'memory_free' : '-',
387                       'vg_size' : 'node_unreachable',
388                       'vg_free' : '-' },
389                     "call_node_info",
390                     )
391   return retux
392
393
394 def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
395   """Add a node to the cluster.
396
397   This is a single-node call.
398
399   """
400   params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
401   c = Client("node_add", params)
402   c.connect(node)
403   c.run()
404   return c.getresult().get(node, False)
405
406
407 def call_node_verify(node_list, checkdict):
408   """Request verification of given parameters.
409
410   This is a multi-node call.
411
412   """
413   c = Client("node_verify", [checkdict])
414   c.connect_list(node_list)
415   c.run()
416   return c.getresult()
417
418
419 def call_node_start_master(node):
420   """Tells a node to activate itself as a master.
421
422   This is a single-node call.
423
424   """
425   c = Client("node_start_master", [])
426   c.connect(node)
427   c.run()
428   return c.getresult().get(node, False)
429
430
431 def call_node_stop_master(node):
432   """Tells a node to demote itself from master status.
433
434   This is a single-node call.
435
436   """
437   c = Client("node_stop_master", [])
438   c.connect(node)
439   c.run()
440   return c.getresult().get(node, False)
441
442
443 def call_version(node_list):
444   """Query node version.
445
446   This is a multi-node call.
447
448   """
449   c = Client("version", [])
450   c.connect_list(node_list)
451   c.run()
452   return c.getresult()
453
454
455 def call_blockdev_create(node, bdev, size, on_primary, info):
456   """Request creation of a given block device.
457
458   This is a single-node call.
459
460   """
461   params = [bdev.Dumps(), size, on_primary, info]
462   c = Client("blockdev_create", params)
463   c.connect(node)
464   c.run()
465   return c.getresult().get(node, False)
466
467
468 def call_blockdev_remove(node, bdev):
469   """Request removal of a given block device.
470
471   This is a single-node call.
472
473   """
474   c = Client("blockdev_remove", [bdev.Dumps()])
475   c.connect(node)
476   c.run()
477   return c.getresult().get(node, False)
478
479
480 def call_blockdev_assemble(node, disk, on_primary):
481   """Request assembling of a given block device.
482
483   This is a single-node call.
484
485   """
486   params = [disk.Dumps(), on_primary]
487   c = Client("blockdev_assemble", params)
488   c.connect(node)
489   c.run()
490   return c.getresult().get(node, False)
491
492
493 def call_blockdev_shutdown(node, disk):
494   """Request shutdown of a given block device.
495
496   This is a single-node call.
497
498   """
499   c = Client("blockdev_shutdown", [disk.Dumps()])
500   c.connect(node)
501   c.run()
502   return c.getresult().get(node, False)
503
504
505 def call_blockdev_addchild(node, bdev, ndev):
506   """Request adding a new child to a (mirroring) device.
507
508   This is a single-node call.
509
510   """
511   params = [bdev.Dumps(), ndev.Dumps()]
512   c = Client("blockdev_addchild", params)
513   c.connect(node)
514   c.run()
515   return c.getresult().get(node, False)
516
517
518 def call_blockdev_removechild(node, bdev, ndev):
519   """Request removing a new child from a (mirroring) device.
520
521   This is a single-node call.
522
523   """
524   params = [bdev.Dumps(), ndev.Dumps()]
525   c = Client("blockdev_removechild", params)
526   c.connect(node)
527   c.run()
528   return c.getresult().get(node, False)
529
530
531 def call_blockdev_getmirrorstatus(node, disks):
532   """Request status of a (mirroring) device.
533
534   This is a single-node call.
535
536   """
537   params = [dsk.Dumps() for dsk in disks]
538   c = Client("blockdev_getmirrorstatus", params)
539   c.connect(node)
540   c.run()
541   return c.getresult().get(node, False)
542
543
544 def call_blockdev_find(node, disk):
545   """Request identification of a given block device.
546
547   This is a single-node call.
548
549   """
550   c = Client("blockdev_find", [disk.Dumps()])
551   c.connect(node)
552   c.run()
553   return c.getresult().get(node, False)
554
555
556 def call_upload_file(node_list, file_name):
557   """Upload a file.
558
559   The node will refuse the operation in case the file is not on the
560   approved file list.
561
562   This is a multi-node call.
563
564   """
565   fh = file(file_name)
566   try:
567     data = fh.read()
568   finally:
569     fh.close()
570   st = os.stat(file_name)
571   params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
572             st.st_atime, st.st_mtime]
573   c = Client("upload_file", params)
574   c.connect_list(node_list)
575   c.run()
576   return c.getresult()
577
578
579 def call_os_diagnose(node_list):
580   """Request a diagnose of OS definitions.
581
582   This is a multi-node call.
583
584   """
585   c = Client("os_diagnose", [])
586   c.connect_list(node_list)
587   c.run()
588   result = c.getresult()
589   new_result = {}
590   for node_name in result:
591     nr = []
592     if result[node_name]:
593       for data in result[node_name]:
594         if data:
595           if isinstance(data, basestring):
596             nr.append(objects.ConfigObject.Loads(data))
597           elif isinstance(data, tuple) and len(data) == 2:
598             nr.append(errors.InvalidOS(data[0], data[1]))
599           else:
600             raise errors.ProgrammerError("Invalid data from"
601                                          " xcserver.os_diagnose")
602     new_result[node_name] = nr
603   return new_result
604
605
606 def call_os_get(node_list, name):
607   """Returns an OS definition.
608
609   This is a multi-node call.
610
611   """
612   c = Client("os_get", [name])
613   c.connect_list(node_list)
614   c.run()
615   result = c.getresult()
616   new_result = {}
617   for node_name in result:
618     data = result[node_name]
619     if isinstance(data, basestring):
620       new_result[node_name] = objects.ConfigObject.Loads(data)
621     elif isinstance(data, tuple) and len(data) == 2:
622       new_result[node_name] = errors.InvalidOS(data[0], data[1])
623     else:
624       new_result[node_name] = data
625   return new_result
626
627
628 def call_hooks_runner(node_list, hpath, phase, env):
629   """Call the hooks runner.
630
631   Args:
632     - op: the OpCode instance
633     - env: a dictionary with the environment
634
635   This is a multi-node call.
636
637   """
638   params = [hpath, phase, env]
639   c = Client("hooks_runner", params)
640   c.connect_list(node_list)
641   c.run()
642   result = c.getresult()
643   return result
644
645
646 def call_blockdev_snapshot(node, cf_bdev):
647   """Request a snapshot of the given block device.
648
649   This is a single-node call.
650
651   """
652   c = Client("blockdev_snapshot", [cf_bdev.Dumps()])
653   c.connect(node)
654   c.run()
655   return c.getresult().get(node, False)
656
657
658 def call_snapshot_export(node, snap_bdev, dest_node, instance):
659   """Request the export of a given snapshot.
660
661   This is a single-node call.
662
663   """
664   params = [snap_bdev.Dumps(), dest_node, instance.Dumps()]
665   c = Client("snapshot_export", params)
666   c.connect(node)
667   c.run()
668   return c.getresult().get(node, False)
669
670
671 def call_finalize_export(node, instance, snap_disks):
672   """Request the completion of an export operation.
673
674   This writes the export config file, etc.
675
676   This is a single-node call.
677
678   """
679   flat_disks = []
680   for disk in snap_disks:
681     flat_disks.append(disk.Dumps())
682   params = [instance.Dumps(), flat_disks]
683   c = Client("finalize_export", params)
684   c.connect(node)
685   c.run()
686   return c.getresult().get(node, False)
687
688
689 def call_export_info(node, path):
690   """Queries the export information in a given path.
691
692   This is a single-node call.
693
694   """
695   c = Client("export_info", [path])
696   c.connect(node)
697   c.run()
698   result = c.getresult().get(node, False)
699   if not result:
700     return result
701   return objects.SerializableConfigParser.Loads(result)
702
703
704 def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
705   """Request the import of a backup into an instance.
706
707   This is a single-node call.
708
709   """
710   params = [inst.Dumps(), osdev, swapdev, src_node, src_image]
711   c = Client("instance_os_import", params)
712   c.connect(node)
713   c.run()
714   return c.getresult().get(node, False)
715
716
717 def call_export_list(node_list):
718   """Gets the stored exports list.
719
720   This is a multi-node call.
721
722   """
723   c = Client("export_list", [])
724   c.connect_list(node_list)
725   c.run()
726   result = c.getresult()
727   return result
728
729
730 def call_export_remove(node, export):
731   """Requests removal of a given export.
732
733   This is a single-node call.
734
735   """
736   c = Client("export_remove", [export])
737   c.connect(node)
738   c.run()
739   return c.getresult().get(node, False)
740
741
742 def call_node_leave_cluster(node):
743   """Requests a node to clean the cluster information it has.
744
745   This will remove the configuration information from the ganeti data
746   dir.
747
748   This is a single-node call.
749
750   """
751   c = Client("node_leave_cluster", [])
752   c.connect(node)
753   c.run()
754   return c.getresult().get(node, False)
755
756
757 def call_node_volumes(node_list):
758   """Gets all volumes on node(s).
759
760   This is a multi-node call.
761
762   """
763   c = Client("node_volumes", [])
764   c.connect_list(node_list)
765   c.run()
766   return c.getresult()