Add method to update a disk object size
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script to show add a new node to the cluster
23
24 """
25
26 # pylint: disable-msg=C0103
27
28 import os
29 import socket
30 import httplib
31
32 import simplejson
33
34 from ganeti import logger
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import constants
38 from ganeti import objects
39 from ganeti import ssconf
40
41
42 class NodeController:
43   """Node-handling class.
44
45   For each node that we speak with, we create an instance of this
46   class, so that we have a safe place to store the details of this
47   individual call.
48
49   """
50   def __init__(self, parent, node):
51     self.parent = parent
52     self.node = node
53     self.failed = False
54
55     self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
56     try:
57       hc.connect()
58       hc.putrequest('PUT', "/%s" % self.parent.procedure,
59                     skip_accept_encoding=True)
60       hc.putheader('Content-Length', str(len(parent.body)))
61       hc.endheaders()
62       hc.send(parent.body)
63     except socket.error, err:
64       logger.Error("Error connecting to %s: %s" % (node, str(err)))
65       self.failed = True
66
67   def get_response(self):
68     """Try to process the response from the node.
69
70     """
71     if self.failed:
72       # we already failed in connect
73       return False
74     resp = self.http_conn.getresponse()
75     if resp.status != 200:
76       return False
77     try:
78       length = int(resp.getheader('Content-Length', '0'))
79     except ValueError:
80       return False
81     if not length:
82       logger.Error("Zero-length reply from %s" % self.node)
83       return False
84     payload = resp.read(length)
85     unload = simplejson.loads(payload)
86     return unload
87
88
89 class Client:
90   """RPC Client class.
91
92   This class, given a (remote) method name, a list of parameters and a
93   list of nodes, will contact (in parallel) all nodes, and return a
94   dict of results (key: node name, value: result).
95
96   One current bug is that generic failure is still signalled by
97   'False' result, which is not good. This overloading of values can
98   cause bugs.
99
100   """
101   result_set = False
102   result = False
103   allresult = []
104
105   def __init__(self, procedure, args):
106     ss = ssconf.SimpleStore()
107     self.port = ss.GetNodeDaemonPort()
108     self.nodepw = ss.GetNodeDaemonPassword()
109     self.nc = {}
110     self.results = {}
111     self.procedure = procedure
112     self.args = args
113     self.body = simplejson.dumps(args)
114
115   #--- generic connector -------------
116
117   def connect_list(self, node_list):
118     """Add a list of nodes to the target nodes.
119
120     """
121     for node in node_list:
122       self.connect(node)
123
124   def connect(self, connect_node):
125     """Add a node to the target list.
126
127     """
128     self.nc[connect_node] = nc = NodeController(self, connect_node)
129
130   def getresult(self):
131     """Return the results of the call.
132
133     """
134     return self.results
135
136   def run(self):
137     """Wrapper over reactor.run().
138
139     This function simply calls reactor.run() if we have any requests
140     queued, otherwise it does nothing.
141
142     """
143     for node, nc in self.nc.items():
144       self.results[node] = nc.get_response()
145
146
147 def call_volume_list(node_list, vg_name):
148   """Gets the logical volumes present in a given volume group.
149
150   This is a multi-node call.
151
152   """
153   c = Client("volume_list", [vg_name])
154   c.connect_list(node_list)
155   c.run()
156   return c.getresult()
157
158
159 def call_vg_list(node_list):
160   """Gets the volume group list.
161
162   This is a multi-node call.
163
164   """
165   c = Client("vg_list", [])
166   c.connect_list(node_list)
167   c.run()
168   return c.getresult()
169
170
171 def call_bridges_exist(node, bridges_list):
172   """Checks if a node has all the bridges given.
173
174   This method checks if all bridges given in the bridges_list are
175   present on the remote node, so that an instance that uses interfaces
176   on those bridges can be started.
177
178   This is a single-node call.
179
180   """
181   c = Client("bridges_exist", [bridges_list])
182   c.connect(node)
183   c.run()
184   return c.getresult().get(node, False)
185
186
187 def call_instance_start(node, instance, extra_args):
188   """Starts an instance.
189
190   This is a single-node call.
191
192   """
193   c = Client("instance_start", [instance.ToDict(), extra_args])
194   c.connect(node)
195   c.run()
196   return c.getresult().get(node, False)
197
198
199 def call_instance_shutdown(node, instance):
200   """Stops an instance.
201
202   This is a single-node call.
203
204   """
205   c = Client("instance_shutdown", [instance.ToDict()])
206   c.connect(node)
207   c.run()
208   return c.getresult().get(node, False)
209
210
211 def call_instance_migrate(node, instance, target, live):
212   """Migrate an instance.
213
214   This is a single-node call.
215
216   """
217   c = Client("instance_migrate", [instance.name, target, live])
218   c.connect(node)
219   c.run()
220   return c.getresult().get(node, False)
221
222
223 def call_instance_reboot(node, instance, reboot_type, extra_args):
224   """Reboots an instance.
225
226   This is a single-node call.
227
228   """
229   c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
230   c.connect(node)
231   c.run()
232   return c.getresult().get(node, False)
233
234
235 def call_instance_os_add(node, inst, osdev, swapdev):
236   """Installs an OS on the given instance.
237
238   This is a single-node call.
239
240   """
241   params = [inst.ToDict(), osdev, swapdev]
242   c = Client("instance_os_add", params)
243   c.connect(node)
244   c.run()
245   return c.getresult().get(node, False)
246
247
248 def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
249   """Run the OS rename script for an instance.
250
251   This is a single-node call.
252
253   """
254   params = [inst.ToDict(), old_name, osdev, swapdev]
255   c = Client("instance_run_rename", params)
256   c.connect(node)
257   c.run()
258   return c.getresult().get(node, False)
259
260
261 def call_instance_info(node, instance):
262   """Returns information about a single instance.
263
264   This is a single-node call.
265
266   """
267   c = Client("instance_info", [instance])
268   c.connect(node)
269   c.run()
270   return c.getresult().get(node, False)
271
272
273 def call_all_instances_info(node_list):
274   """Returns information about all instances on a given node.
275
276   This is a single-node call.
277
278   """
279   c = Client("all_instances_info", [])
280   c.connect_list(node_list)
281   c.run()
282   return c.getresult()
283
284
285 def call_instance_list(node_list):
286   """Returns the list of running instances on a given node.
287
288   This is a single-node call.
289
290   """
291   c = Client("instance_list", [])
292   c.connect_list(node_list)
293   c.run()
294   return c.getresult()
295
296
297 def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
298   """Do a TcpPing on the remote node
299
300   This is a single-node call.
301   """
302   c = Client("node_tcp_ping", [source, target, port, timeout,
303                                live_port_needed])
304   c.connect(node)
305   c.run()
306   return c.getresult().get(node, False)
307
308
309 def call_node_info(node_list, vg_name):
310   """Return node information.
311
312   This will return memory information and volume group size and free
313   space.
314
315   This is a multi-node call.
316
317   """
318   c = Client("node_info", [vg_name])
319   c.connect_list(node_list)
320   c.run()
321   retux = c.getresult()
322
323   for node_name in retux:
324     ret = retux.get(node_name, False)
325     if type(ret) != dict:
326       logger.Error("could not connect to node %s" % (node_name))
327       ret = {}
328
329     utils.CheckDict(ret,
330                     { 'memory_total' : '-',
331                       'memory_dom0' : '-',
332                       'memory_free' : '-',
333                       'vg_size' : 'node_unreachable',
334                       'vg_free' : '-' },
335                     "call_node_info",
336                     )
337   return retux
338
339
340 def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
341   """Add a node to the cluster.
342
343   This is a single-node call.
344
345   """
346   params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
347   c = Client("node_add", params)
348   c.connect(node)
349   c.run()
350   return c.getresult().get(node, False)
351
352
353 def call_node_verify(node_list, checkdict):
354   """Request verification of given parameters.
355
356   This is a multi-node call.
357
358   """
359   c = Client("node_verify", [checkdict])
360   c.connect_list(node_list)
361   c.run()
362   return c.getresult()
363
364
365 def call_node_start_master(node):
366   """Tells a node to activate itself as a master.
367
368   This is a single-node call.
369
370   """
371   c = Client("node_start_master", [])
372   c.connect(node)
373   c.run()
374   return c.getresult().get(node, False)
375
376
377 def call_node_stop_master(node):
378   """Tells a node to demote itself from master status.
379
380   This is a single-node call.
381
382   """
383   c = Client("node_stop_master", [])
384   c.connect(node)
385   c.run()
386   return c.getresult().get(node, False)
387
388
389 def call_version(node_list):
390   """Query node version.
391
392   This is a multi-node call.
393
394   """
395   c = Client("version", [])
396   c.connect_list(node_list)
397   c.run()
398   return c.getresult()
399
400
401 def call_blockdev_create(node, bdev, size, owner, on_primary, info):
402   """Request creation of a given block device.
403
404   This is a single-node call.
405
406   """
407   params = [bdev.ToDict(), size, owner, on_primary, info]
408   c = Client("blockdev_create", params)
409   c.connect(node)
410   c.run()
411   return c.getresult().get(node, False)
412
413
414 def call_blockdev_remove(node, bdev):
415   """Request removal of a given block device.
416
417   This is a single-node call.
418
419   """
420   c = Client("blockdev_remove", [bdev.ToDict()])
421   c.connect(node)
422   c.run()
423   return c.getresult().get(node, False)
424
425
426 def call_blockdev_rename(node, devlist):
427   """Request rename of the given block devices.
428
429   This is a single-node call.
430
431   """
432   params = [(d.ToDict(), uid) for d, uid in devlist]
433   c = Client("blockdev_rename", params)
434   c.connect(node)
435   c.run()
436   return c.getresult().get(node, False)
437
438
439 def call_blockdev_assemble(node, disk, owner, on_primary):
440   """Request assembling of a given block device.
441
442   This is a single-node call.
443
444   """
445   params = [disk.ToDict(), owner, on_primary]
446   c = Client("blockdev_assemble", params)
447   c.connect(node)
448   c.run()
449   return c.getresult().get(node, False)
450
451
452 def call_blockdev_shutdown(node, disk):
453   """Request shutdown of a given block device.
454
455   This is a single-node call.
456
457   """
458   c = Client("blockdev_shutdown", [disk.ToDict()])
459   c.connect(node)
460   c.run()
461   return c.getresult().get(node, False)
462
463
464 def call_blockdev_addchildren(node, bdev, ndevs):
465   """Request adding a list of children to a (mirroring) device.
466
467   This is a single-node call.
468
469   """
470   params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
471   c = Client("blockdev_addchildren", params)
472   c.connect(node)
473   c.run()
474   return c.getresult().get(node, False)
475
476
477 def call_blockdev_removechildren(node, bdev, ndevs):
478   """Request removing a list of children from a (mirroring) device.
479
480   This is a single-node call.
481
482   """
483   params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
484   c = Client("blockdev_removechildren", params)
485   c.connect(node)
486   c.run()
487   return c.getresult().get(node, False)
488
489
490 def call_blockdev_getmirrorstatus(node, disks):
491   """Request status of a (mirroring) device.
492
493   This is a single-node call.
494
495   """
496   params = [dsk.ToDict() for dsk in disks]
497   c = Client("blockdev_getmirrorstatus", params)
498   c.connect(node)
499   c.run()
500   return c.getresult().get(node, False)
501
502
503 def call_blockdev_find(node, disk):
504   """Request identification of a given block device.
505
506   This is a single-node call.
507
508   """
509   c = Client("blockdev_find", [disk.ToDict()])
510   c.connect(node)
511   c.run()
512   return c.getresult().get(node, False)
513
514
515 def call_upload_file(node_list, file_name):
516   """Upload a file.
517
518   The node will refuse the operation in case the file is not on the
519   approved file list.
520
521   This is a multi-node call.
522
523   """
524   fh = file(file_name)
525   try:
526     data = fh.read()
527   finally:
528     fh.close()
529   st = os.stat(file_name)
530   params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
531             st.st_atime, st.st_mtime]
532   c = Client("upload_file", params)
533   c.connect_list(node_list)
534   c.run()
535   return c.getresult()
536
537
538 def call_os_diagnose(node_list):
539   """Request a diagnose of OS definitions.
540
541   This is a multi-node call.
542
543   """
544   c = Client("os_diagnose", [])
545   c.connect_list(node_list)
546   c.run()
547   result = c.getresult()
548   new_result = {}
549   for node_name in result:
550     if result[node_name]:
551       nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
552     else:
553       nr = []
554     new_result[node_name] = nr
555   return new_result
556
557
558 def call_os_get(node, name):
559   """Returns an OS definition.
560
561   This is a single-node call.
562
563   """
564   c = Client("os_get", [name])
565   c.connect(node)
566   c.run()
567   result = c.getresult().get(node, False)
568   if isinstance(result, dict):
569     return objects.OS.FromDict(result)
570   else:
571     return result
572
573
574 def call_hooks_runner(node_list, hpath, phase, env):
575   """Call the hooks runner.
576
577   Args:
578     - op: the OpCode instance
579     - env: a dictionary with the environment
580
581   This is a multi-node call.
582
583   """
584   params = [hpath, phase, env]
585   c = Client("hooks_runner", params)
586   c.connect_list(node_list)
587   c.run()
588   result = c.getresult()
589   return result
590
591
592 def call_iallocator_runner(node, name, idata):
593   """Call an iallocator on a remote node
594
595   Args:
596     - name: the iallocator name
597     - input: the json-encoded input string
598
599   This is a single-node call.
600
601   """
602   params = [name, idata]
603   c = Client("iallocator_runner", params)
604   c.connect(node)
605   c.run()
606   result = c.getresult().get(node, False)
607   return result
608
609
610 def call_blockdev_grow(node, cf_bdev, amount):
611   """Request a snapshot of the given block device.
612
613   This is a single-node call.
614
615   """
616   c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
617   c.connect(node)
618   c.run()
619   return c.getresult().get(node, False)
620
621
622 def call_blockdev_snapshot(node, cf_bdev):
623   """Request a snapshot of the given block device.
624
625   This is a single-node call.
626
627   """
628   c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
629   c.connect(node)
630   c.run()
631   return c.getresult().get(node, False)
632
633
634 def call_snapshot_export(node, snap_bdev, dest_node, instance):
635   """Request the export of a given snapshot.
636
637   This is a single-node call.
638
639   """
640   params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
641   c = Client("snapshot_export", params)
642   c.connect(node)
643   c.run()
644   return c.getresult().get(node, False)
645
646
647 def call_finalize_export(node, instance, snap_disks):
648   """Request the completion of an export operation.
649
650   This writes the export config file, etc.
651
652   This is a single-node call.
653
654   """
655   flat_disks = []
656   for disk in snap_disks:
657     flat_disks.append(disk.ToDict())
658   params = [instance.ToDict(), flat_disks]
659   c = Client("finalize_export", params)
660   c.connect(node)
661   c.run()
662   return c.getresult().get(node, False)
663
664
665 def call_export_info(node, path):
666   """Queries the export information in a given path.
667
668   This is a single-node call.
669
670   """
671   c = Client("export_info", [path])
672   c.connect(node)
673   c.run()
674   result = c.getresult().get(node, False)
675   if not result:
676     return result
677   return objects.SerializableConfigParser.Loads(str(result))
678
679
680 def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
681   """Request the import of a backup into an instance.
682
683   This is a single-node call.
684
685   """
686   params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
687   c = Client("instance_os_import", params)
688   c.connect(node)
689   c.run()
690   return c.getresult().get(node, False)
691
692
693 def call_export_list(node_list):
694   """Gets the stored exports list.
695
696   This is a multi-node call.
697
698   """
699   c = Client("export_list", [])
700   c.connect_list(node_list)
701   c.run()
702   result = c.getresult()
703   return result
704
705
706 def call_export_remove(node, export):
707   """Requests removal of a given export.
708
709   This is a single-node call.
710
711   """
712   c = Client("export_remove", [export])
713   c.connect(node)
714   c.run()
715   return c.getresult().get(node, False)
716
717
718 def call_node_leave_cluster(node):
719   """Requests a node to clean the cluster information it has.
720
721   This will remove the configuration information from the ganeti data
722   dir.
723
724   This is a single-node call.
725
726   """
727   c = Client("node_leave_cluster", [])
728   c.connect(node)
729   c.run()
730   return c.getresult().get(node, False)
731
732
733 def call_node_volumes(node_list):
734   """Gets all volumes on node(s).
735
736   This is a multi-node call.
737
738   """
739   c = Client("node_volumes", [])
740   c.connect_list(node_list)
741   c.run()
742   return c.getresult()
743
744
745 def call_test_delay(node_list, duration):
746   """Sleep for a fixed time on given node(s).
747
748   This is a multi-node call.
749
750   """
751   c = Client("test_delay", [duration])
752   c.connect_list(node_list)
753   c.run()
754   return c.getresult()
755
756
757 def call_file_storage_dir_create(node, file_storage_dir):
758   """Create the given file storage directory.
759
760   This is a single-node call.
761
762   """
763   c = Client("file_storage_dir_create", [file_storage_dir])
764   c.connect(node)
765   c.run()
766   return c.getresult().get(node, False)
767
768
769 def call_file_storage_dir_remove(node, file_storage_dir):
770   """Remove the given file storage directory.
771
772   This is a single-node call.
773
774   """
775   c = Client("file_storage_dir_remove", [file_storage_dir])
776   c.connect(node)
777   c.run()
778   return c.getresult().get(node, False)
779
780
781 def call_file_storage_dir_rename(node, old_file_storage_dir,
782                                  new_file_storage_dir):
783   """Rename file storage directory.
784
785   This is a single-node call.
786
787   """
788   c = Client("file_storage_dir_rename",
789              [old_file_storage_dir, new_file_storage_dir])
790   c.connect(node)
791   c.run()
792   return c.getresult().get(node, False)
793