Add a new parameter to backend.(Start|Stop)Master
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script to show add a new node to the cluster
23
24 """
25
26 # pylint: disable-msg=C0103
27
28 import os
29 import socket
30 import httplib
31
32 import simplejson
33
34 from ganeti import logger
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import constants
38 from ganeti import objects
39 from ganeti import ssconf
40
41
42 class NodeController:
43   """Node-handling class.
44
45   For each node that we speak with, we create an instance of this
46   class, so that we have a safe place to store the details of this
47   individual call.
48
49   """
50   def __init__(self, parent, node):
51     self.parent = parent
52     self.node = node
53     self.failed = False
54
55     self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
56     try:
57       hc.connect()
58       hc.putrequest('PUT', "/%s" % self.parent.procedure,
59                     skip_accept_encoding=True)
60       hc.putheader('Content-Length', str(len(parent.body)))
61       hc.endheaders()
62       hc.send(parent.body)
63     except socket.error, err:
64       logger.Error("Error connecting to %s: %s" % (node, str(err)))
65       self.failed = True
66
67   def get_response(self):
68     """Try to process the response from the node.
69
70     """
71     if self.failed:
72       # we already failed in connect
73       return False
74     resp = self.http_conn.getresponse()
75     if resp.status != 200:
76       return False
77     try:
78       length = int(resp.getheader('Content-Length', '0'))
79     except ValueError:
80       return False
81     if not length:
82       logger.Error("Zero-length reply from %s" % self.node)
83       return False
84     payload = resp.read(length)
85     unload = simplejson.loads(payload)
86     return unload
87
88
89 class Client:
90   """RPC Client class.
91
92   This class, given a (remote) method name, a list of parameters and a
93   list of nodes, will contact (in parallel) all nodes, and return a
94   dict of results (key: node name, value: result).
95
96   One current bug is that generic failure is still signalled by
97   'False' result, which is not good. This overloading of values can
98   cause bugs.
99
100   """
101   result_set = False
102   result = False
103   allresult = []
104
105   def __init__(self, procedure, args):
106     ss = ssconf.SimpleStore()
107     self.port = ss.GetNodeDaemonPort()
108     self.nodepw = ss.GetNodeDaemonPassword()
109     self.nc = {}
110     self.results = {}
111     self.procedure = procedure
112     self.args = args
113     self.body = simplejson.dumps(args)
114
115   #--- generic connector -------------
116
117   def connect_list(self, node_list):
118     """Add a list of nodes to the target nodes.
119
120     """
121     for node in node_list:
122       self.connect(node)
123
124   def connect(self, connect_node):
125     """Add a node to the target list.
126
127     """
128     self.nc[connect_node] = nc = NodeController(self, connect_node)
129
130   def getresult(self):
131     """Return the results of the call.
132
133     """
134     return self.results
135
136   def run(self):
137     """Wrapper over reactor.run().
138
139     This function simply calls reactor.run() if we have any requests
140     queued, otherwise it does nothing.
141
142     """
143     for node, nc in self.nc.items():
144       self.results[node] = nc.get_response()
145
146
147 def call_volume_list(node_list, vg_name):
148   """Gets the logical volumes present in a given volume group.
149
150   This is a multi-node call.
151
152   """
153   c = Client("volume_list", [vg_name])
154   c.connect_list(node_list)
155   c.run()
156   return c.getresult()
157
158
159 def call_vg_list(node_list):
160   """Gets the volume group list.
161
162   This is a multi-node call.
163
164   """
165   c = Client("vg_list", [])
166   c.connect_list(node_list)
167   c.run()
168   return c.getresult()
169
170
171 def call_bridges_exist(node, bridges_list):
172   """Checks if a node has all the bridges given.
173
174   This method checks if all bridges given in the bridges_list are
175   present on the remote node, so that an instance that uses interfaces
176   on those bridges can be started.
177
178   This is a single-node call.
179
180   """
181   c = Client("bridges_exist", [bridges_list])
182   c.connect(node)
183   c.run()
184   return c.getresult().get(node, False)
185
186
187 def call_instance_start(node, instance, extra_args):
188   """Starts an instance.
189
190   This is a single-node call.
191
192   """
193   c = Client("instance_start", [instance.ToDict(), extra_args])
194   c.connect(node)
195   c.run()
196   return c.getresult().get(node, False)
197
198
199 def call_instance_shutdown(node, instance):
200   """Stops an instance.
201
202   This is a single-node call.
203
204   """
205   c = Client("instance_shutdown", [instance.ToDict()])
206   c.connect(node)
207   c.run()
208   return c.getresult().get(node, False)
209
210
211 def call_instance_migrate(node, instance, target, live):
212   """Migrate an instance.
213
214   This is a single-node call.
215
216   """
217   c = Client("instance_migrate", [instance.name, target, live])
218   c.connect(node)
219   c.run()
220   return c.getresult().get(node, False)
221
222
223 def call_instance_reboot(node, instance, reboot_type, extra_args):
224   """Reboots an instance.
225
226   This is a single-node call.
227
228   """
229   c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
230   c.connect(node)
231   c.run()
232   return c.getresult().get(node, False)
233
234
235 def call_instance_os_add(node, inst, osdev, swapdev):
236   """Installs an OS on the given instance.
237
238   This is a single-node call.
239
240   """
241   params = [inst.ToDict(), osdev, swapdev]
242   c = Client("instance_os_add", params)
243   c.connect(node)
244   c.run()
245   return c.getresult().get(node, False)
246
247
248 def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
249   """Run the OS rename script for an instance.
250
251   This is a single-node call.
252
253   """
254   params = [inst.ToDict(), old_name, osdev, swapdev]
255   c = Client("instance_run_rename", params)
256   c.connect(node)
257   c.run()
258   return c.getresult().get(node, False)
259
260
261 def call_instance_info(node, instance):
262   """Returns information about a single instance.
263
264   This is a single-node call.
265
266   """
267   c = Client("instance_info", [instance])
268   c.connect(node)
269   c.run()
270   return c.getresult().get(node, False)
271
272
273 def call_all_instances_info(node_list):
274   """Returns information about all instances on a given node.
275
276   This is a single-node call.
277
278   """
279   c = Client("all_instances_info", [])
280   c.connect_list(node_list)
281   c.run()
282   return c.getresult()
283
284
285 def call_instance_list(node_list):
286   """Returns the list of running instances on a given node.
287
288   This is a single-node call.
289
290   """
291   c = Client("instance_list", [])
292   c.connect_list(node_list)
293   c.run()
294   return c.getresult()
295
296
297 def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
298   """Do a TcpPing on the remote node
299
300   This is a single-node call.
301   """
302   c = Client("node_tcp_ping", [source, target, port, timeout,
303                                live_port_needed])
304   c.connect(node)
305   c.run()
306   return c.getresult().get(node, False)
307
308
309 def call_node_info(node_list, vg_name):
310   """Return node information.
311
312   This will return memory information and volume group size and free
313   space.
314
315   This is a multi-node call.
316
317   """
318   c = Client("node_info", [vg_name])
319   c.connect_list(node_list)
320   c.run()
321   retux = c.getresult()
322
323   for node_name in retux:
324     ret = retux.get(node_name, False)
325     if type(ret) != dict:
326       logger.Error("could not connect to node %s" % (node_name))
327       ret = {}
328
329     utils.CheckDict(ret,
330                     { 'memory_total' : '-',
331                       'memory_dom0' : '-',
332                       'memory_free' : '-',
333                       'vg_size' : 'node_unreachable',
334                       'vg_free' : '-' },
335                     "call_node_info",
336                     )
337   return retux
338
339
340 def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
341   """Add a node to the cluster.
342
343   This is a single-node call.
344
345   """
346   params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
347   c = Client("node_add", params)
348   c.connect(node)
349   c.run()
350   return c.getresult().get(node, False)
351
352
353 def call_node_verify(node_list, checkdict):
354   """Request verification of given parameters.
355
356   This is a multi-node call.
357
358   """
359   c = Client("node_verify", [checkdict])
360   c.connect_list(node_list)
361   c.run()
362   return c.getresult()
363
364
365 def call_node_start_master(node, start_daemons):
366   """Tells a node to activate itself as a master.
367
368   This is a single-node call.
369
370   """
371   c = Client("node_start_master", [start_daemons])
372   c.connect(node)
373   c.run()
374   return c.getresult().get(node, False)
375
376
377 def call_node_stop_master(node, stop_daemons):
378   """Tells a node to demote itself from master status.
379
380   This is a single-node call.
381
382   """
383   c = Client("node_stop_master", [stop_daemons])
384   c.connect(node)
385   c.run()
386   return c.getresult().get(node, False)
387
388
389 def call_version(node_list):
390   """Query node version.
391
392   This is a multi-node call.
393
394   """
395   c = Client("version", [])
396   c.connect_list(node_list)
397   c.run()
398   return c.getresult()
399
400
401 def call_blockdev_create(node, bdev, size, owner, on_primary, info):
402   """Request creation of a given block device.
403
404   This is a single-node call.
405
406   """
407   params = [bdev.ToDict(), size, owner, on_primary, info]
408   c = Client("blockdev_create", params)
409   c.connect(node)
410   c.run()
411   return c.getresult().get(node, False)
412
413
414 def call_blockdev_remove(node, bdev):
415   """Request removal of a given block device.
416
417   This is a single-node call.
418
419   """
420   c = Client("blockdev_remove", [bdev.ToDict()])
421   c.connect(node)
422   c.run()
423   return c.getresult().get(node, False)
424
425
426 def call_blockdev_rename(node, devlist):
427   """Request rename of the given block devices.
428
429   This is a single-node call.
430
431   """
432   params = [(d.ToDict(), uid) for d, uid in devlist]
433   c = Client("blockdev_rename", params)
434   c.connect(node)
435   c.run()
436   return c.getresult().get(node, False)
437
438
439 def call_blockdev_assemble(node, disk, owner, on_primary):
440   """Request assembling of a given block device.
441
442   This is a single-node call.
443
444   """
445   params = [disk.ToDict(), owner, on_primary]
446   c = Client("blockdev_assemble", params)
447   c.connect(node)
448   c.run()
449   return c.getresult().get(node, False)
450
451
452 def call_blockdev_shutdown(node, disk):
453   """Request shutdown of a given block device.
454
455   This is a single-node call.
456
457   """
458   c = Client("blockdev_shutdown", [disk.ToDict()])
459   c.connect(node)
460   c.run()
461   return c.getresult().get(node, False)
462
463
464 def call_blockdev_addchildren(node, bdev, ndevs):
465   """Request adding a list of children to a (mirroring) device.
466
467   This is a single-node call.
468
469   """
470   params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
471   c = Client("blockdev_addchildren", params)
472   c.connect(node)
473   c.run()
474   return c.getresult().get(node, False)
475
476
477 def call_blockdev_removechildren(node, bdev, ndevs):
478   """Request removing a list of children from a (mirroring) device.
479
480   This is a single-node call.
481
482   """
483   params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
484   c = Client("blockdev_removechildren", params)
485   c.connect(node)
486   c.run()
487   return c.getresult().get(node, False)
488
489
490 def call_blockdev_getmirrorstatus(node, disks):
491   """Request status of a (mirroring) device.
492
493   This is a single-node call.
494
495   """
496   params = [dsk.ToDict() for dsk in disks]
497   c = Client("blockdev_getmirrorstatus", params)
498   c.connect(node)
499   c.run()
500   return c.getresult().get(node, False)
501
502
503 def call_blockdev_find(node, disk):
504   """Request identification of a given block device.
505
506   This is a single-node call.
507
508   """
509   c = Client("blockdev_find", [disk.ToDict()])
510   c.connect(node)
511   c.run()
512   return c.getresult().get(node, False)
513
514
515 def call_blockdev_close(node, disks):
516   """Closes the given block devices.
517
518   This is a single-node call.
519
520   """
521   params = [cf.ToDict() for cf in disks]
522   c = Client("blockdev_close", params)
523   c.connect(node)
524   c.run()
525   return c.getresult().get(node, False)
526
527
528 def call_upload_file(node_list, file_name):
529   """Upload a file.
530
531   The node will refuse the operation in case the file is not on the
532   approved file list.
533
534   This is a multi-node call.
535
536   """
537   fh = file(file_name)
538   try:
539     data = fh.read()
540   finally:
541     fh.close()
542   st = os.stat(file_name)
543   params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
544             st.st_atime, st.st_mtime]
545   c = Client("upload_file", params)
546   c.connect_list(node_list)
547   c.run()
548   return c.getresult()
549
550
551 def call_os_diagnose(node_list):
552   """Request a diagnose of OS definitions.
553
554   This is a multi-node call.
555
556   """
557   c = Client("os_diagnose", [])
558   c.connect_list(node_list)
559   c.run()
560   result = c.getresult()
561   new_result = {}
562   for node_name in result:
563     if result[node_name]:
564       nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
565     else:
566       nr = []
567     new_result[node_name] = nr
568   return new_result
569
570
571 def call_os_get(node, name):
572   """Returns an OS definition.
573
574   This is a single-node call.
575
576   """
577   c = Client("os_get", [name])
578   c.connect(node)
579   c.run()
580   result = c.getresult().get(node, False)
581   if isinstance(result, dict):
582     return objects.OS.FromDict(result)
583   else:
584     return result
585
586
587 def call_hooks_runner(node_list, hpath, phase, env):
588   """Call the hooks runner.
589
590   Args:
591     - op: the OpCode instance
592     - env: a dictionary with the environment
593
594   This is a multi-node call.
595
596   """
597   params = [hpath, phase, env]
598   c = Client("hooks_runner", params)
599   c.connect_list(node_list)
600   c.run()
601   result = c.getresult()
602   return result
603
604
605 def call_iallocator_runner(node, name, idata):
606   """Call an iallocator on a remote node
607
608   Args:
609     - name: the iallocator name
610     - input: the json-encoded input string
611
612   This is a single-node call.
613
614   """
615   params = [name, idata]
616   c = Client("iallocator_runner", params)
617   c.connect(node)
618   c.run()
619   result = c.getresult().get(node, False)
620   return result
621
622
623 def call_blockdev_grow(node, cf_bdev, amount):
624   """Request a snapshot of the given block device.
625
626   This is a single-node call.
627
628   """
629   c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
630   c.connect(node)
631   c.run()
632   return c.getresult().get(node, False)
633
634
635 def call_blockdev_snapshot(node, cf_bdev):
636   """Request a snapshot of the given block device.
637
638   This is a single-node call.
639
640   """
641   c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
642   c.connect(node)
643   c.run()
644   return c.getresult().get(node, False)
645
646
647 def call_snapshot_export(node, snap_bdev, dest_node, instance):
648   """Request the export of a given snapshot.
649
650   This is a single-node call.
651
652   """
653   params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
654   c = Client("snapshot_export", params)
655   c.connect(node)
656   c.run()
657   return c.getresult().get(node, False)
658
659
660 def call_finalize_export(node, instance, snap_disks):
661   """Request the completion of an export operation.
662
663   This writes the export config file, etc.
664
665   This is a single-node call.
666
667   """
668   flat_disks = []
669   for disk in snap_disks:
670     flat_disks.append(disk.ToDict())
671   params = [instance.ToDict(), flat_disks]
672   c = Client("finalize_export", params)
673   c.connect(node)
674   c.run()
675   return c.getresult().get(node, False)
676
677
678 def call_export_info(node, path):
679   """Queries the export information in a given path.
680
681   This is a single-node call.
682
683   """
684   c = Client("export_info", [path])
685   c.connect(node)
686   c.run()
687   result = c.getresult().get(node, False)
688   if not result:
689     return result
690   return objects.SerializableConfigParser.Loads(str(result))
691
692
693 def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
694   """Request the import of a backup into an instance.
695
696   This is a single-node call.
697
698   """
699   params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
700   c = Client("instance_os_import", params)
701   c.connect(node)
702   c.run()
703   return c.getresult().get(node, False)
704
705
706 def call_export_list(node_list):
707   """Gets the stored exports list.
708
709   This is a multi-node call.
710
711   """
712   c = Client("export_list", [])
713   c.connect_list(node_list)
714   c.run()
715   result = c.getresult()
716   return result
717
718
719 def call_export_remove(node, export):
720   """Requests removal of a given export.
721
722   This is a single-node call.
723
724   """
725   c = Client("export_remove", [export])
726   c.connect(node)
727   c.run()
728   return c.getresult().get(node, False)
729
730
731 def call_node_leave_cluster(node):
732   """Requests a node to clean the cluster information it has.
733
734   This will remove the configuration information from the ganeti data
735   dir.
736
737   This is a single-node call.
738
739   """
740   c = Client("node_leave_cluster", [])
741   c.connect(node)
742   c.run()
743   return c.getresult().get(node, False)
744
745
746 def call_node_volumes(node_list):
747   """Gets all volumes on node(s).
748
749   This is a multi-node call.
750
751   """
752   c = Client("node_volumes", [])
753   c.connect_list(node_list)
754   c.run()
755   return c.getresult()
756
757
758 def call_test_delay(node_list, duration):
759   """Sleep for a fixed time on given node(s).
760
761   This is a multi-node call.
762
763   """
764   c = Client("test_delay", [duration])
765   c.connect_list(node_list)
766   c.run()
767   return c.getresult()
768
769
770 def call_file_storage_dir_create(node, file_storage_dir):
771   """Create the given file storage directory.
772
773   This is a single-node call.
774
775   """
776   c = Client("file_storage_dir_create", [file_storage_dir])
777   c.connect(node)
778   c.run()
779   return c.getresult().get(node, False)
780
781
782 def call_file_storage_dir_remove(node, file_storage_dir):
783   """Remove the given file storage directory.
784
785   This is a single-node call.
786
787   """
788   c = Client("file_storage_dir_remove", [file_storage_dir])
789   c.connect(node)
790   c.run()
791   return c.getresult().get(node, False)
792
793
794 def call_file_storage_dir_rename(node, old_file_storage_dir,
795                                  new_file_storage_dir):
796   """Rename file storage directory.
797
798   This is a single-node call.
799
800   """
801   c = Client("file_storage_dir_rename",
802              [old_file_storage_dir, new_file_storage_dir])
803   c.connect(node)
804   c.run()
805   return c.getresult().get(node, False)