Add a test opcode that sleeps for a given duration
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script to show add a new node to the cluster
23
24 """
25
26 # pylint: disable-msg=C0103
27
28 import os
29
30 from twisted.internet.pollreactor import PollReactor
31
32 class ReReactor(PollReactor):
33   """A re-startable Reactor implementation.
34
35   """
36   def run(self, installSignalHandlers=1):
37     """Custom run method.
38
39     This is customized run that, before calling Reactor.run, will
40     reinstall the shutdown events and re-create the threadpool in case
41     these are not present (as will happen on the second run of the
42     reactor).
43
44     """
45     if not 'shutdown' in self._eventTriggers:
46       # the shutdown queue has been killed, we are most probably
47       # at the second run, thus recreate the queue
48       self.addSystemEventTrigger('during', 'shutdown', self.crash)
49       self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
50     if self.threadpool is not None and self.threadpool.joined == 1:
51       # in case the threadpool has been stopped, re-start it
52       # and add a trigger to stop it at reactor shutdown
53       self.threadpool.start()
54       self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
55
56     return PollReactor.run(self, installSignalHandlers)
57
58
59 import twisted.internet.main
60 twisted.internet.main.installReactor(ReReactor())
61
62 from twisted.spread import pb
63 from twisted.internet import reactor
64 from twisted.cred import credentials
65 from OpenSSL import SSL, crypto
66
67 from ganeti import logger
68 from ganeti import utils
69 from ganeti import errors
70 from ganeti import constants
71 from ganeti import objects
72 from ganeti import ssconf
73
74 class NodeController:
75   """Node-handling class.
76
77   For each node that we speak with, we create an instance of this
78   class, so that we have a safe place to store the details of this
79   individual call.
80
81   """
82   def __init__(self, parent, node):
83     self.parent = parent
84     self.node = node
85
86   def _check_end(self):
87     """Stop the reactor if we got all the results.
88
89     """
90     if len(self.parent.results) == len(self.parent.nc):
91       reactor.stop()
92
93   def cb_call(self, obj):
94     """Callback for successful connect.
95
96     If the connect and login sequence succeeded, we proceed with
97     making the actual call.
98
99     """
100     deferred = obj.callRemote(self.parent.procedure, self.parent.args)
101     deferred.addCallbacks(self.cb_done, self.cb_err2)
102
103   def cb_done(self, result):
104     """Callback for successful call.
105
106     When we receive the result from a call, we check if it was an
107     error and if so we raise a generic RemoteError (we can't pass yet
108     the actual exception over). If there was no error, we store the
109     result.
110
111     """
112     tb, self.parent.results[self.node] = result
113     self._check_end()
114     if tb:
115       raise errors.RemoteError("Remote procedure error calling %s on %s:"
116                                "\n%s" % (self.parent.procedure,
117                                          self.node,
118                                          tb))
119
120   def cb_err1(self, reason):
121     """Error callback for unsuccessful connect.
122
123     """
124     logger.Error("caller_connect: could not connect to remote host %s,"
125                  " reason %s" % (self.node, reason))
126     self.parent.results[self.node] = False
127     self._check_end()
128
129   def cb_err2(self, reason):
130     """Error callback for unsuccessful call.
131
132     This is when the call didn't return anything, not even an error,
133     or when it time out, etc.
134
135     """
136     logger.Error("caller_call: could not call %s on node %s,"
137                  " reason %s" % (self.parent.procedure, self.node, reason))
138     self.parent.results[self.node] = False
139     self._check_end()
140
141
142 class MirrorContextFactory:
143   """Certificate verifier factory.
144
145   This factory creates contexts that verify if the remote end has a
146   specific certificate (i.e. our own certificate).
147
148   The checks we do are that the PEM dump of the certificate is the
149   same as our own and (somewhat redundantly) that the SHA checksum is
150   the same.
151
152   """
153   isClient = 1
154
155   def __init__(self):
156     try:
157       fd = open(constants.SSL_CERT_FILE, 'r')
158       try:
159         data = fd.read(16384)
160       finally:
161         fd.close()
162     except EnvironmentError, err:
163       raise errors.ConfigurationError("missing SSL certificate: %s" %
164                                       str(err))
165     self.mycert = crypto.load_certificate(crypto.FILETYPE_PEM, data)
166     self.mypem = crypto.dump_certificate(crypto.FILETYPE_PEM, self.mycert)
167     self.mydigest = self.mycert.digest('SHA')
168
169   def verifier(self, conn, x509, errno, err_depth, retcode):
170     """Certificate verify method.
171
172     """
173     if self.mydigest != x509.digest('SHA'):
174       return False
175     if crypto.dump_certificate(crypto.FILETYPE_PEM, x509) != self.mypem:
176       return False
177     return True
178
179   def getContext(self):
180     """Context generator.
181
182     """
183     context = SSL.Context(SSL.TLSv1_METHOD)
184     context.set_verify(SSL.VERIFY_PEER, self.verifier)
185     return context
186
187 class Client:
188   """RPC Client class.
189
190   This class, given a (remote) method name, a list of parameters and a
191   list of nodes, will contact (in parallel) all nodes, and return a
192   dict of results (key: node name, value: result).
193
194   One current bug is that generic failure is still signalled by
195   'False' result, which is not good. This overloading of values can
196   cause bugs.
197
198   """
199   result_set = False
200   result = False
201   allresult = []
202
203   def __init__(self, procedure, args):
204     ss = ssconf.SimpleStore()
205     self.port = ss.GetNodeDaemonPort()
206     self.nodepw = ss.GetNodeDaemonPassword()
207     self.nc = {}
208     self.results = {}
209     self.procedure = procedure
210     self.args = args
211
212   #--- generic connector -------------
213
214   def connect_list(self, node_list):
215     """Add a list of nodes to the target nodes.
216
217     """
218     for node in node_list:
219       self.connect(node)
220
221   def connect(self, connect_node):
222     """Add a node to the target list.
223
224     """
225     factory = pb.PBClientFactory()
226     self.nc[connect_node] = nc = NodeController(self, connect_node)
227     reactor.connectSSL(connect_node, self.port, factory,
228                        MirrorContextFactory())
229     #d = factory.getRootObject()
230     d = factory.login(credentials.UsernamePassword("master_node", self.nodepw))
231     d.addCallbacks(nc.cb_call, nc.cb_err1)
232
233   def getresult(self):
234     """Return the results of the call.
235
236     """
237     return self.results
238
239   def run(self):
240     """Wrapper over reactor.run().
241
242     This function simply calls reactor.run() if we have any requests
243     queued, otherwise it does nothing.
244
245     """
246     if self.nc:
247       reactor.run()
248
249
250 def call_volume_list(node_list, vg_name):
251   """Gets the logical volumes present in a given volume group.
252
253   This is a multi-node call.
254
255   """
256   c = Client("volume_list", [vg_name])
257   c.connect_list(node_list)
258   c.run()
259   return c.getresult()
260
261
262 def call_vg_list(node_list):
263   """Gets the volume group list.
264
265   This is a multi-node call.
266
267   """
268   c = Client("vg_list", [])
269   c.connect_list(node_list)
270   c.run()
271   return c.getresult()
272
273
274 def call_bridges_exist(node, bridges_list):
275   """Checks if a node has all the bridges given.
276
277   This method checks if all bridges given in the bridges_list are
278   present on the remote node, so that an instance that uses interfaces
279   on those bridges can be started.
280
281   This is a single-node call.
282
283   """
284   c = Client("bridges_exist", [bridges_list])
285   c.connect(node)
286   c.run()
287   return c.getresult().get(node, False)
288
289
290 def call_instance_start(node, instance, extra_args):
291   """Starts an instance.
292
293   This is a single-node call.
294
295   """
296   c = Client("instance_start", [instance.ToDict(), extra_args])
297   c.connect(node)
298   c.run()
299   return c.getresult().get(node, False)
300
301
302 def call_instance_shutdown(node, instance):
303   """Stops an instance.
304
305   This is a single-node call.
306
307   """
308   c = Client("instance_shutdown", [instance.ToDict()])
309   c.connect(node)
310   c.run()
311   return c.getresult().get(node, False)
312
313
314 def call_instance_reboot(node, instance, reboot_type, extra_args):
315   """Reboots an instance.
316
317   This is a single-node call.
318
319   """
320   c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
321   c.connect(node)
322   c.run()
323   return c.getresult().get(node, False)
324
325
326 def call_instance_os_add(node, inst, osdev, swapdev):
327   """Installs an OS on the given instance.
328
329   This is a single-node call.
330
331   """
332   params = [inst.ToDict(), osdev, swapdev]
333   c = Client("instance_os_add", params)
334   c.connect(node)
335   c.run()
336   return c.getresult().get(node, False)
337
338
339 def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
340   """Run the OS rename script for an instance.
341
342   This is a single-node call.
343
344   """
345   params = [inst.ToDict(), old_name, osdev, swapdev]
346   c = Client("instance_run_rename", params)
347   c.connect(node)
348   c.run()
349   return c.getresult().get(node, False)
350
351
352 def call_instance_info(node, instance):
353   """Returns information about a single instance.
354
355   This is a single-node call.
356
357   """
358   c = Client("instance_info", [instance])
359   c.connect(node)
360   c.run()
361   return c.getresult().get(node, False)
362
363
364 def call_all_instances_info(node_list):
365   """Returns information about all instances on a given node.
366
367   This is a single-node call.
368
369   """
370   c = Client("all_instances_info", [])
371   c.connect_list(node_list)
372   c.run()
373   return c.getresult()
374
375
376 def call_instance_list(node_list):
377   """Returns the list of running instances on a given node.
378
379   This is a single-node call.
380
381   """
382   c = Client("instance_list", [])
383   c.connect_list(node_list)
384   c.run()
385   return c.getresult()
386
387
388 def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
389   """Do a TcpPing on the remote node
390
391   This is a single-node call.
392   """
393   c = Client("node_tcp_ping", [source, target, port, timeout,
394                                live_port_needed])
395   c.connect(node)
396   c.run()
397   return c.getresult().get(node, False)
398
399
400 def call_node_info(node_list, vg_name):
401   """Return node information.
402
403   This will return memory information and volume group size and free
404   space.
405
406   This is a multi-node call.
407
408   """
409   c = Client("node_info", [vg_name])
410   c.connect_list(node_list)
411   c.run()
412   retux = c.getresult()
413
414   for node_name in retux:
415     ret = retux.get(node_name, False)
416     if type(ret) != dict:
417       logger.Error("could not connect to node %s" % (node_name))
418       ret = {}
419
420     utils.CheckDict(ret,
421                     { 'memory_total' : '-',
422                       'memory_dom0' : '-',
423                       'memory_free' : '-',
424                       'vg_size' : 'node_unreachable',
425                       'vg_free' : '-' },
426                     "call_node_info",
427                     )
428   return retux
429
430
431 def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
432   """Add a node to the cluster.
433
434   This is a single-node call.
435
436   """
437   params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
438   c = Client("node_add", params)
439   c.connect(node)
440   c.run()
441   return c.getresult().get(node, False)
442
443
444 def call_node_verify(node_list, checkdict):
445   """Request verification of given parameters.
446
447   This is a multi-node call.
448
449   """
450   c = Client("node_verify", [checkdict])
451   c.connect_list(node_list)
452   c.run()
453   return c.getresult()
454
455
456 def call_node_start_master(node):
457   """Tells a node to activate itself as a master.
458
459   This is a single-node call.
460
461   """
462   c = Client("node_start_master", [])
463   c.connect(node)
464   c.run()
465   return c.getresult().get(node, False)
466
467
468 def call_node_stop_master(node):
469   """Tells a node to demote itself from master status.
470
471   This is a single-node call.
472
473   """
474   c = Client("node_stop_master", [])
475   c.connect(node)
476   c.run()
477   return c.getresult().get(node, False)
478
479
480 def call_version(node_list):
481   """Query node version.
482
483   This is a multi-node call.
484
485   """
486   c = Client("version", [])
487   c.connect_list(node_list)
488   c.run()
489   return c.getresult()
490
491
492 def call_blockdev_create(node, bdev, size, owner, on_primary, info):
493   """Request creation of a given block device.
494
495   This is a single-node call.
496
497   """
498   params = [bdev.ToDict(), size, owner, on_primary, info]
499   c = Client("blockdev_create", params)
500   c.connect(node)
501   c.run()
502   return c.getresult().get(node, False)
503
504
505 def call_blockdev_remove(node, bdev):
506   """Request removal of a given block device.
507
508   This is a single-node call.
509
510   """
511   c = Client("blockdev_remove", [bdev.ToDict()])
512   c.connect(node)
513   c.run()
514   return c.getresult().get(node, False)
515
516
517 def call_blockdev_rename(node, devlist):
518   """Request rename of the given block devices.
519
520   This is a single-node call.
521
522   """
523   params = [(d.ToDict(), uid) for d, uid in devlist]
524   c = Client("blockdev_rename", params)
525   c.connect(node)
526   c.run()
527   return c.getresult().get(node, False)
528
529
530 def call_blockdev_assemble(node, disk, owner, on_primary):
531   """Request assembling of a given block device.
532
533   This is a single-node call.
534
535   """
536   params = [disk.ToDict(), owner, on_primary]
537   c = Client("blockdev_assemble", params)
538   c.connect(node)
539   c.run()
540   return c.getresult().get(node, False)
541
542
543 def call_blockdev_shutdown(node, disk):
544   """Request shutdown of a given block device.
545
546   This is a single-node call.
547
548   """
549   c = Client("blockdev_shutdown", [disk.ToDict()])
550   c.connect(node)
551   c.run()
552   return c.getresult().get(node, False)
553
554
555 def call_blockdev_addchildren(node, bdev, ndevs):
556   """Request adding a list of children to a (mirroring) device.
557
558   This is a single-node call.
559
560   """
561   params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
562   c = Client("blockdev_addchildren", params)
563   c.connect(node)
564   c.run()
565   return c.getresult().get(node, False)
566
567
568 def call_blockdev_removechildren(node, bdev, ndevs):
569   """Request removing a list of children from a (mirroring) device.
570
571   This is a single-node call.
572
573   """
574   params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
575   c = Client("blockdev_removechildren", params)
576   c.connect(node)
577   c.run()
578   return c.getresult().get(node, False)
579
580
581 def call_blockdev_getmirrorstatus(node, disks):
582   """Request status of a (mirroring) device.
583
584   This is a single-node call.
585
586   """
587   params = [dsk.ToDict() for dsk in disks]
588   c = Client("blockdev_getmirrorstatus", params)
589   c.connect(node)
590   c.run()
591   return c.getresult().get(node, False)
592
593
594 def call_blockdev_find(node, disk):
595   """Request identification of a given block device.
596
597   This is a single-node call.
598
599   """
600   c = Client("blockdev_find", [disk.ToDict()])
601   c.connect(node)
602   c.run()
603   return c.getresult().get(node, False)
604
605
606 def call_upload_file(node_list, file_name):
607   """Upload a file.
608
609   The node will refuse the operation in case the file is not on the
610   approved file list.
611
612   This is a multi-node call.
613
614   """
615   fh = file(file_name)
616   try:
617     data = fh.read()
618   finally:
619     fh.close()
620   st = os.stat(file_name)
621   params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
622             st.st_atime, st.st_mtime]
623   c = Client("upload_file", params)
624   c.connect_list(node_list)
625   c.run()
626   return c.getresult()
627
628
629 def call_os_diagnose(node_list):
630   """Request a diagnose of OS definitions.
631
632   This is a multi-node call.
633
634   """
635   c = Client("os_diagnose", [])
636   c.connect_list(node_list)
637   c.run()
638   result = c.getresult()
639   new_result = {}
640   for node_name in result:
641     if result[node_name]:
642       nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
643     else:
644       nr = []
645     new_result[node_name] = nr
646   return new_result
647
648
649 def call_os_get(node, name):
650   """Returns an OS definition.
651
652   This is a single-node call.
653
654   """
655   c = Client("os_get", [name])
656   c.connect(node)
657   c.run()
658   result = c.getresult().get(node, False)
659   if isinstance(result, dict):
660     return objects.OS.FromDict(result)
661   else:
662     return result
663
664
665 def call_hooks_runner(node_list, hpath, phase, env):
666   """Call the hooks runner.
667
668   Args:
669     - op: the OpCode instance
670     - env: a dictionary with the environment
671
672   This is a multi-node call.
673
674   """
675   params = [hpath, phase, env]
676   c = Client("hooks_runner", params)
677   c.connect_list(node_list)
678   c.run()
679   result = c.getresult()
680   return result
681
682
683 def call_blockdev_snapshot(node, cf_bdev):
684   """Request a snapshot of the given block device.
685
686   This is a single-node call.
687
688   """
689   c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
690   c.connect(node)
691   c.run()
692   return c.getresult().get(node, False)
693
694
695 def call_snapshot_export(node, snap_bdev, dest_node, instance):
696   """Request the export of a given snapshot.
697
698   This is a single-node call.
699
700   """
701   params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
702   c = Client("snapshot_export", params)
703   c.connect(node)
704   c.run()
705   return c.getresult().get(node, False)
706
707
708 def call_finalize_export(node, instance, snap_disks):
709   """Request the completion of an export operation.
710
711   This writes the export config file, etc.
712
713   This is a single-node call.
714
715   """
716   flat_disks = []
717   for disk in snap_disks:
718     flat_disks.append(disk.ToDict())
719   params = [instance.ToDict(), flat_disks]
720   c = Client("finalize_export", params)
721   c.connect(node)
722   c.run()
723   return c.getresult().get(node, False)
724
725
726 def call_export_info(node, path):
727   """Queries the export information in a given path.
728
729   This is a single-node call.
730
731   """
732   c = Client("export_info", [path])
733   c.connect(node)
734   c.run()
735   result = c.getresult().get(node, False)
736   if not result:
737     return result
738   return objects.SerializableConfigParser.Loads(result)
739
740
741 def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
742   """Request the import of a backup into an instance.
743
744   This is a single-node call.
745
746   """
747   params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
748   c = Client("instance_os_import", params)
749   c.connect(node)
750   c.run()
751   return c.getresult().get(node, False)
752
753
754 def call_export_list(node_list):
755   """Gets the stored exports list.
756
757   This is a multi-node call.
758
759   """
760   c = Client("export_list", [])
761   c.connect_list(node_list)
762   c.run()
763   result = c.getresult()
764   return result
765
766
767 def call_export_remove(node, export):
768   """Requests removal of a given export.
769
770   This is a single-node call.
771
772   """
773   c = Client("export_remove", [export])
774   c.connect(node)
775   c.run()
776   return c.getresult().get(node, False)
777
778
779 def call_node_leave_cluster(node):
780   """Requests a node to clean the cluster information it has.
781
782   This will remove the configuration information from the ganeti data
783   dir.
784
785   This is a single-node call.
786
787   """
788   c = Client("node_leave_cluster", [])
789   c.connect(node)
790   c.run()
791   return c.getresult().get(node, False)
792
793
794 def call_node_volumes(node_list):
795   """Gets all volumes on node(s).
796
797   This is a multi-node call.
798
799   """
800   c = Client("node_volumes", [])
801   c.connect_list(node_list)
802   c.run()
803   return c.getresult()
804
805
806 def call_test_delay(node_list, duration):
807   """Sleep for a fixed time on given node(s).
808
809   This is a multi-node call.
810
811   """
812   c = Client("test_delay", [duration])
813   c.connect_list(node_list)
814   c.run()
815   return c.getresult()