backend.py change to get cluster name from master
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script to show add a new node to the cluster
23
24 """
25
26 # pylint: disable-msg=C0103
27
28 import os
29 import socket
30 import httplib
31
32 import simplejson
33
34 from ganeti import logger
35 from ganeti import utils
36 from ganeti import objects
37
38
39 class NodeController:
40   """Node-handling class.
41
42   For each node that we speak with, we create an instance of this
43   class, so that we have a safe place to store the details of this
44   individual call.
45
46   """
47   def __init__(self, parent, node):
48     self.parent = parent
49     self.node = node
50     self.failed = False
51
52     self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
53     try:
54       hc.connect()
55       hc.putrequest('PUT', "/%s" % self.parent.procedure,
56                     skip_accept_encoding=True)
57       hc.putheader('Content-Length', str(len(parent.body)))
58       hc.endheaders()
59       hc.send(parent.body)
60     except socket.error, err:
61       logger.Error("Error connecting to %s: %s" % (node, str(err)))
62       self.failed = True
63
64   def get_response(self):
65     """Try to process the response from the node.
66
67     """
68     if self.failed:
69       # we already failed in connect
70       return False
71     resp = self.http_conn.getresponse()
72     if resp.status != 200:
73       return False
74     try:
75       length = int(resp.getheader('Content-Length', '0'))
76     except ValueError:
77       return False
78     if not length:
79       logger.Error("Zero-length reply from %s" % self.node)
80       return False
81     payload = resp.read(length)
82     unload = simplejson.loads(payload)
83     return unload
84
85
86 class Client:
87   """RPC Client class.
88
89   This class, given a (remote) method name, a list of parameters and a
90   list of nodes, will contact (in parallel) all nodes, and return a
91   dict of results (key: node name, value: result).
92
93   One current bug is that generic failure is still signalled by
94   'False' result, which is not good. This overloading of values can
95   cause bugs.
96
97   """
98   result_set = False
99   result = False
100   allresult = []
101
102   def __init__(self, procedure, args):
103     self.port = utils.GetNodeDaemonPort()
104     self.nodepw = utils.GetNodeDaemonPassword()
105     self.nc = {}
106     self.results = {}
107     self.procedure = procedure
108     self.args = args
109     self.body = simplejson.dumps(args)
110
111   #--- generic connector -------------
112
113   def connect_list(self, node_list):
114     """Add a list of nodes to the target nodes.
115
116     """
117     for node in node_list:
118       self.connect(node)
119
120   def connect(self, connect_node):
121     """Add a node to the target list.
122
123     """
124     self.nc[connect_node] = nc = NodeController(self, connect_node)
125
126   def getresult(self):
127     """Return the results of the call.
128
129     """
130     return self.results
131
132   def run(self):
133     """Wrapper over reactor.run().
134
135     This function simply calls reactor.run() if we have any requests
136     queued, otherwise it does nothing.
137
138     """
139     for node, nc in self.nc.items():
140       self.results[node] = nc.get_response()
141
142
143 def call_volume_list(node_list, vg_name):
144   """Gets the logical volumes present in a given volume group.
145
146   This is a multi-node call.
147
148   """
149   c = Client("volume_list", [vg_name])
150   c.connect_list(node_list)
151   c.run()
152   return c.getresult()
153
154
155 def call_vg_list(node_list):
156   """Gets the volume group list.
157
158   This is a multi-node call.
159
160   """
161   c = Client("vg_list", [])
162   c.connect_list(node_list)
163   c.run()
164   return c.getresult()
165
166
167 def call_bridges_exist(node, bridges_list):
168   """Checks if a node has all the bridges given.
169
170   This method checks if all bridges given in the bridges_list are
171   present on the remote node, so that an instance that uses interfaces
172   on those bridges can be started.
173
174   This is a single-node call.
175
176   """
177   c = Client("bridges_exist", [bridges_list])
178   c.connect(node)
179   c.run()
180   return c.getresult().get(node, False)
181
182
183 def call_instance_start(node, instance, extra_args):
184   """Starts an instance.
185
186   This is a single-node call.
187
188   """
189   c = Client("instance_start", [instance.ToDict(), extra_args])
190   c.connect(node)
191   c.run()
192   return c.getresult().get(node, False)
193
194
195 def call_instance_shutdown(node, instance):
196   """Stops an instance.
197
198   This is a single-node call.
199
200   """
201   c = Client("instance_shutdown", [instance.ToDict()])
202   c.connect(node)
203   c.run()
204   return c.getresult().get(node, False)
205
206
207 def call_instance_migrate(node, instance, target, live):
208   """Migrate an instance.
209
210   This is a single-node call.
211
212   """
213   c = Client("instance_migrate", [instance.name, target, live])
214   c.connect(node)
215   c.run()
216   return c.getresult().get(node, False)
217
218
219 def call_instance_reboot(node, instance, reboot_type, extra_args):
220   """Reboots an instance.
221
222   This is a single-node call.
223
224   """
225   c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
226   c.connect(node)
227   c.run()
228   return c.getresult().get(node, False)
229
230
231 def call_instance_os_add(node, inst, osdev, swapdev):
232   """Installs an OS on the given instance.
233
234   This is a single-node call.
235
236   """
237   params = [inst.ToDict(), osdev, swapdev]
238   c = Client("instance_os_add", params)
239   c.connect(node)
240   c.run()
241   return c.getresult().get(node, False)
242
243
244 def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
245   """Run the OS rename script for an instance.
246
247   This is a single-node call.
248
249   """
250   params = [inst.ToDict(), old_name, osdev, swapdev]
251   c = Client("instance_run_rename", params)
252   c.connect(node)
253   c.run()
254   return c.getresult().get(node, False)
255
256
257 def call_instance_info(node, instance):
258   """Returns information about a single instance.
259
260   This is a single-node call.
261
262   """
263   c = Client("instance_info", [instance])
264   c.connect(node)
265   c.run()
266   return c.getresult().get(node, False)
267
268
269 def call_all_instances_info(node_list):
270   """Returns information about all instances on a given node.
271
272   This is a single-node call.
273
274   """
275   c = Client("all_instances_info", [])
276   c.connect_list(node_list)
277   c.run()
278   return c.getresult()
279
280
281 def call_instance_list(node_list):
282   """Returns the list of running instances on a given node.
283
284   This is a single-node call.
285
286   """
287   c = Client("instance_list", [])
288   c.connect_list(node_list)
289   c.run()
290   return c.getresult()
291
292
293 def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
294   """Do a TcpPing on the remote node
295
296   This is a single-node call.
297   """
298   c = Client("node_tcp_ping", [source, target, port, timeout,
299                                live_port_needed])
300   c.connect(node)
301   c.run()
302   return c.getresult().get(node, False)
303
304
305 def call_node_info(node_list, vg_name):
306   """Return node information.
307
308   This will return memory information and volume group size and free
309   space.
310
311   This is a multi-node call.
312
313   """
314   c = Client("node_info", [vg_name])
315   c.connect_list(node_list)
316   c.run()
317   retux = c.getresult()
318
319   for node_name in retux:
320     ret = retux.get(node_name, False)
321     if type(ret) != dict:
322       logger.Error("could not connect to node %s" % (node_name))
323       ret = {}
324
325     utils.CheckDict(ret,
326                     { 'memory_total' : '-',
327                       'memory_dom0' : '-',
328                       'memory_free' : '-',
329                       'vg_size' : 'node_unreachable',
330                       'vg_free' : '-' },
331                     "call_node_info",
332                     )
333   return retux
334
335
336 def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
337   """Add a node to the cluster.
338
339   This is a single-node call.
340
341   """
342   params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
343   c = Client("node_add", params)
344   c.connect(node)
345   c.run()
346   return c.getresult().get(node, False)
347
348
349 def call_node_verify(node_list, checkdict, cluster_name):
350   """Request verification of given parameters.
351
352   This is a multi-node call.
353
354   """
355   c = Client("node_verify", [checkdict, cluster_name])
356   c.connect_list(node_list)
357   c.run()
358   return c.getresult()
359
360
361 def call_node_start_master(node, start_daemons):
362   """Tells a node to activate itself as a master.
363
364   This is a single-node call.
365
366   """
367   c = Client("node_start_master", [start_daemons])
368   c.connect(node)
369   c.run()
370   return c.getresult().get(node, False)
371
372
373 def call_node_stop_master(node, stop_daemons):
374   """Tells a node to demote itself from master status.
375
376   This is a single-node call.
377
378   """
379   c = Client("node_stop_master", [stop_daemons])
380   c.connect(node)
381   c.run()
382   return c.getresult().get(node, False)
383
384
385 def call_master_info(node_list):
386   """Query master info.
387
388   This is a multi-node call.
389
390   """
391   c = Client("master_info", [])
392   c.connect_list(node_list)
393   c.run()
394   return c.getresult()
395
396
397 def call_version(node_list):
398   """Query node version.
399
400   This is a multi-node call.
401
402   """
403   c = Client("version", [])
404   c.connect_list(node_list)
405   c.run()
406   return c.getresult()
407
408
409 def call_blockdev_create(node, bdev, size, owner, on_primary, info):
410   """Request creation of a given block device.
411
412   This is a single-node call.
413
414   """
415   params = [bdev.ToDict(), size, owner, on_primary, info]
416   c = Client("blockdev_create", params)
417   c.connect(node)
418   c.run()
419   return c.getresult().get(node, False)
420
421
422 def call_blockdev_remove(node, bdev):
423   """Request removal of a given block device.
424
425   This is a single-node call.
426
427   """
428   c = Client("blockdev_remove", [bdev.ToDict()])
429   c.connect(node)
430   c.run()
431   return c.getresult().get(node, False)
432
433
434 def call_blockdev_rename(node, devlist):
435   """Request rename of the given block devices.
436
437   This is a single-node call.
438
439   """
440   params = [(d.ToDict(), uid) for d, uid in devlist]
441   c = Client("blockdev_rename", params)
442   c.connect(node)
443   c.run()
444   return c.getresult().get(node, False)
445
446
447 def call_blockdev_assemble(node, disk, owner, on_primary):
448   """Request assembling of a given block device.
449
450   This is a single-node call.
451
452   """
453   params = [disk.ToDict(), owner, on_primary]
454   c = Client("blockdev_assemble", params)
455   c.connect(node)
456   c.run()
457   return c.getresult().get(node, False)
458
459
460 def call_blockdev_shutdown(node, disk):
461   """Request shutdown of a given block device.
462
463   This is a single-node call.
464
465   """
466   c = Client("blockdev_shutdown", [disk.ToDict()])
467   c.connect(node)
468   c.run()
469   return c.getresult().get(node, False)
470
471
472 def call_blockdev_addchildren(node, bdev, ndevs):
473   """Request adding a list of children to a (mirroring) device.
474
475   This is a single-node call.
476
477   """
478   params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
479   c = Client("blockdev_addchildren", params)
480   c.connect(node)
481   c.run()
482   return c.getresult().get(node, False)
483
484
485 def call_blockdev_removechildren(node, bdev, ndevs):
486   """Request removing a list of children from a (mirroring) device.
487
488   This is a single-node call.
489
490   """
491   params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
492   c = Client("blockdev_removechildren", params)
493   c.connect(node)
494   c.run()
495   return c.getresult().get(node, False)
496
497
498 def call_blockdev_getmirrorstatus(node, disks):
499   """Request status of a (mirroring) device.
500
501   This is a single-node call.
502
503   """
504   params = [dsk.ToDict() for dsk in disks]
505   c = Client("blockdev_getmirrorstatus", params)
506   c.connect(node)
507   c.run()
508   return c.getresult().get(node, False)
509
510
511 def call_blockdev_find(node, disk):
512   """Request identification of a given block device.
513
514   This is a single-node call.
515
516   """
517   c = Client("blockdev_find", [disk.ToDict()])
518   c.connect(node)
519   c.run()
520   return c.getresult().get(node, False)
521
522
523 def call_blockdev_close(node, disks):
524   """Closes the given block devices.
525
526   This is a single-node call.
527
528   """
529   params = [cf.ToDict() for cf in disks]
530   c = Client("blockdev_close", params)
531   c.connect(node)
532   c.run()
533   return c.getresult().get(node, False)
534
535
536 def call_upload_file(node_list, file_name):
537   """Upload a file.
538
539   The node will refuse the operation in case the file is not on the
540   approved file list.
541
542   This is a multi-node call.
543
544   """
545   fh = file(file_name)
546   try:
547     data = fh.read()
548   finally:
549     fh.close()
550   st = os.stat(file_name)
551   params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
552             st.st_atime, st.st_mtime]
553   c = Client("upload_file", params)
554   c.connect_list(node_list)
555   c.run()
556   return c.getresult()
557
558
559 def call_os_diagnose(node_list):
560   """Request a diagnose of OS definitions.
561
562   This is a multi-node call.
563
564   """
565   c = Client("os_diagnose", [])
566   c.connect_list(node_list)
567   c.run()
568   result = c.getresult()
569   new_result = {}
570   for node_name in result:
571     if result[node_name]:
572       nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
573     else:
574       nr = []
575     new_result[node_name] = nr
576   return new_result
577
578
579 def call_os_get(node, name):
580   """Returns an OS definition.
581
582   This is a single-node call.
583
584   """
585   c = Client("os_get", [name])
586   c.connect(node)
587   c.run()
588   result = c.getresult().get(node, False)
589   if isinstance(result, dict):
590     return objects.OS.FromDict(result)
591   else:
592     return result
593
594
595 def call_hooks_runner(node_list, hpath, phase, env):
596   """Call the hooks runner.
597
598   Args:
599     - op: the OpCode instance
600     - env: a dictionary with the environment
601
602   This is a multi-node call.
603
604   """
605   params = [hpath, phase, env]
606   c = Client("hooks_runner", params)
607   c.connect_list(node_list)
608   c.run()
609   result = c.getresult()
610   return result
611
612
613 def call_iallocator_runner(node, name, idata):
614   """Call an iallocator on a remote node
615
616   Args:
617     - name: the iallocator name
618     - input: the json-encoded input string
619
620   This is a single-node call.
621
622   """
623   params = [name, idata]
624   c = Client("iallocator_runner", params)
625   c.connect(node)
626   c.run()
627   result = c.getresult().get(node, False)
628   return result
629
630
631 def call_blockdev_grow(node, cf_bdev, amount):
632   """Request a snapshot of the given block device.
633
634   This is a single-node call.
635
636   """
637   c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
638   c.connect(node)
639   c.run()
640   return c.getresult().get(node, False)
641
642
643 def call_blockdev_snapshot(node, cf_bdev):
644   """Request a snapshot of the given block device.
645
646   This is a single-node call.
647
648   """
649   c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
650   c.connect(node)
651   c.run()
652   return c.getresult().get(node, False)
653
654
655 def call_snapshot_export(node, snap_bdev, dest_node, instance, cluster_name):
656   """Request the export of a given snapshot.
657
658   This is a single-node call.
659
660   """
661   params = [snap_bdev.ToDict(), dest_node, instance.ToDict(), cluster_name]
662   c = Client("snapshot_export", params)
663   c.connect(node)
664   c.run()
665   return c.getresult().get(node, False)
666
667
668 def call_finalize_export(node, instance, snap_disks):
669   """Request the completion of an export operation.
670
671   This writes the export config file, etc.
672
673   This is a single-node call.
674
675   """
676   flat_disks = []
677   for disk in snap_disks:
678     flat_disks.append(disk.ToDict())
679   params = [instance.ToDict(), flat_disks]
680   c = Client("finalize_export", params)
681   c.connect(node)
682   c.run()
683   return c.getresult().get(node, False)
684
685
686 def call_export_info(node, path):
687   """Queries the export information in a given path.
688
689   This is a single-node call.
690
691   """
692   c = Client("export_info", [path])
693   c.connect(node)
694   c.run()
695   result = c.getresult().get(node, False)
696   if not result:
697     return result
698   return objects.SerializableConfigParser.Loads(str(result))
699
700
701 def call_instance_os_import(node, inst, osdev, swapdev,
702                             src_node, src_image, cluster_name):
703   """Request the import of a backup into an instance.
704
705   This is a single-node call.
706
707   """
708   params = [inst.ToDict(), osdev, swapdev, src_node, src_image, cluster_name]
709   c = Client("instance_os_import", params)
710   c.connect(node)
711   c.run()
712   return c.getresult().get(node, False)
713
714
715 def call_export_list(node_list):
716   """Gets the stored exports list.
717
718   This is a multi-node call.
719
720   """
721   c = Client("export_list", [])
722   c.connect_list(node_list)
723   c.run()
724   result = c.getresult()
725   return result
726
727
728 def call_export_remove(node, export):
729   """Requests removal of a given export.
730
731   This is a single-node call.
732
733   """
734   c = Client("export_remove", [export])
735   c.connect(node)
736   c.run()
737   return c.getresult().get(node, False)
738
739
740 def call_node_leave_cluster(node):
741   """Requests a node to clean the cluster information it has.
742
743   This will remove the configuration information from the ganeti data
744   dir.
745
746   This is a single-node call.
747
748   """
749   c = Client("node_leave_cluster", [])
750   c.connect(node)
751   c.run()
752   return c.getresult().get(node, False)
753
754
755 def call_node_volumes(node_list):
756   """Gets all volumes on node(s).
757
758   This is a multi-node call.
759
760   """
761   c = Client("node_volumes", [])
762   c.connect_list(node_list)
763   c.run()
764   return c.getresult()
765
766
767 def call_test_delay(node_list, duration):
768   """Sleep for a fixed time on given node(s).
769
770   This is a multi-node call.
771
772   """
773   c = Client("test_delay", [duration])
774   c.connect_list(node_list)
775   c.run()
776   return c.getresult()
777
778
779 def call_file_storage_dir_create(node, file_storage_dir):
780   """Create the given file storage directory.
781
782   This is a single-node call.
783
784   """
785   c = Client("file_storage_dir_create", [file_storage_dir])
786   c.connect(node)
787   c.run()
788   return c.getresult().get(node, False)
789
790
791 def call_file_storage_dir_remove(node, file_storage_dir):
792   """Remove the given file storage directory.
793
794   This is a single-node call.
795
796   """
797   c = Client("file_storage_dir_remove", [file_storage_dir])
798   c.connect(node)
799   c.run()
800   return c.getresult().get(node, False)
801
802
803 def call_file_storage_dir_rename(node, old_file_storage_dir,
804                                  new_file_storage_dir):
805   """Rename file storage directory.
806
807   This is a single-node call.
808
809   """
810   c = Client("file_storage_dir_rename",
811              [old_file_storage_dir, new_file_storage_dir])
812   c.connect(node)
813   c.run()
814   return c.getresult().get(node, False)
815
816
817 def call_jobqueue_update(node_list, file_name, content):
818   """Update job queue.
819
820   This is a multi-node call.
821
822   """
823   c = Client("jobqueue_update", [file_name, content])
824   c.connect_list(node_list)
825   c.run()
826   result = c.getresult()
827   return result
828
829
830 def call_jobqueue_purge(node):
831   """Purge job queue.
832
833   This is a single-node call.
834
835   """
836   c = Client("jobqueue_purge", [])
837   c.connect(node)
838   c.run()
839   return c.getresult().get(node, False)
840
841
842 def call_jobqueue_rename(node_list, old, new):
843   """Rename a job queue file.
844
845   This is a multi-node call.
846
847   """
848   c = Client("jobqueue_rename", [old, new])
849   c.connect_list(node_list)
850   c.run()
851   result = c.getresult()
852   return result