http.server: No longer enfore JSON encoding for body
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable-msg=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36
37 from optparse import OptionParser
38
39 from ganeti import backend
40 from ganeti import constants
41 from ganeti import objects
42 from ganeti import errors
43 from ganeti import jstore
44 from ganeti import daemon
45 from ganeti import http
46 from ganeti import utils
47 from ganeti import storage
48 from ganeti import serializer
49
50 import ganeti.http.server # pylint: disable-msg=W0611
51
52
53 queue_lock = None
54
55
56 def _RequireJobQueueLock(fn):
57   """Decorator for job queue manipulating functions.
58
59   """
60   QUEUE_LOCK_TIMEOUT = 10
61
62   def wrapper(*args, **kwargs):
63     # Locking in exclusive, blocking mode because there could be several
64     # children running at the same time. Waiting up to 10 seconds.
65     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
66     try:
67       return fn(*args, **kwargs)
68     finally:
69       queue_lock.Unlock()
70
71   return wrapper
72
73
74 class NodeHttpServer(http.server.HttpServer):
75   """The server implementation.
76
77   This class holds all methods exposed over the RPC interface.
78
79   """
80   # too many public methods, and unused args - all methods get params
81   # due to the API
82   # pylint: disable-msg=R0904,W0613
83   def __init__(self, *args, **kwargs):
84     http.server.HttpServer.__init__(self, *args, **kwargs)
85     self.noded_pid = os.getpid()
86
87   def HandleRequest(self, req):
88     """Handle a request.
89
90     """
91     if req.request_method.upper() != http.HTTP_PUT:
92       raise http.HttpBadRequest()
93
94     path = req.request_path
95     if path.startswith("/"):
96       path = path[1:]
97
98     method = getattr(self, "perspective_%s" % path, None)
99     if method is None:
100       raise http.HttpNotFound()
101
102     try:
103       result = (True, method(serializer.LoadJson(req.request_body)))
104
105     except backend.RPCFail, err:
106       # our custom failure exception; str(err) works fine if the
107       # exception was constructed with a single argument, and in
108       # this case, err.message == err.args[0] == str(err)
109       result = (False, str(err))
110     except errors.QuitGanetiException, err:
111       # Tell parent to quit
112       logging.info("Shutting down the node daemon, arguments: %s",
113                    str(err.args))
114       os.kill(self.noded_pid, signal.SIGTERM)
115       # And return the error's arguments, which must be already in
116       # correct tuple format
117       result = err.args
118     except Exception, err:
119       logging.exception("Error in RPC call")
120       result = (False, "Error while executing backend function: %s" % str(err))
121
122     return serializer.DumpJson(result, indent=False)
123
124   # the new block devices  --------------------------
125
126   @staticmethod
127   def perspective_blockdev_create(params):
128     """Create a block device.
129
130     """
131     bdev_s, size, owner, on_primary, info = params
132     bdev = objects.Disk.FromDict(bdev_s)
133     if bdev is None:
134       raise ValueError("can't unserialize data!")
135     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
136
137   @staticmethod
138   def perspective_blockdev_remove(params):
139     """Remove a block device.
140
141     """
142     bdev_s = params[0]
143     bdev = objects.Disk.FromDict(bdev_s)
144     return backend.BlockdevRemove(bdev)
145
146   @staticmethod
147   def perspective_blockdev_rename(params):
148     """Remove a block device.
149
150     """
151     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
152     return backend.BlockdevRename(devlist)
153
154   @staticmethod
155   def perspective_blockdev_assemble(params):
156     """Assemble a block device.
157
158     """
159     bdev_s, owner, on_primary = params
160     bdev = objects.Disk.FromDict(bdev_s)
161     if bdev is None:
162       raise ValueError("can't unserialize data!")
163     return backend.BlockdevAssemble(bdev, owner, on_primary)
164
165   @staticmethod
166   def perspective_blockdev_shutdown(params):
167     """Shutdown a block device.
168
169     """
170     bdev_s = params[0]
171     bdev = objects.Disk.FromDict(bdev_s)
172     if bdev is None:
173       raise ValueError("can't unserialize data!")
174     return backend.BlockdevShutdown(bdev)
175
176   @staticmethod
177   def perspective_blockdev_addchildren(params):
178     """Add a child to a mirror device.
179
180     Note: this is only valid for mirror devices. It's the caller's duty
181     to send a correct disk, otherwise we raise an error.
182
183     """
184     bdev_s, ndev_s = params
185     bdev = objects.Disk.FromDict(bdev_s)
186     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
187     if bdev is None or ndevs.count(None) > 0:
188       raise ValueError("can't unserialize data!")
189     return backend.BlockdevAddchildren(bdev, ndevs)
190
191   @staticmethod
192   def perspective_blockdev_removechildren(params):
193     """Remove a child from a mirror device.
194
195     This is only valid for mirror devices, of course. It's the callers
196     duty to send a correct disk, otherwise we raise an error.
197
198     """
199     bdev_s, ndev_s = params
200     bdev = objects.Disk.FromDict(bdev_s)
201     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
202     if bdev is None or ndevs.count(None) > 0:
203       raise ValueError("can't unserialize data!")
204     return backend.BlockdevRemovechildren(bdev, ndevs)
205
206   @staticmethod
207   def perspective_blockdev_getmirrorstatus(params):
208     """Return the mirror status for a list of disks.
209
210     """
211     disks = [objects.Disk.FromDict(dsk_s)
212              for dsk_s in params]
213     return [status.ToDict()
214             for status in backend.BlockdevGetmirrorstatus(disks)]
215
216   @staticmethod
217   def perspective_blockdev_find(params):
218     """Expose the FindBlockDevice functionality for a disk.
219
220     This will try to find but not activate a disk.
221
222     """
223     disk = objects.Disk.FromDict(params[0])
224
225     result = backend.BlockdevFind(disk)
226     if result is None:
227       return None
228
229     return result.ToDict()
230
231   @staticmethod
232   def perspective_blockdev_snapshot(params):
233     """Create a snapshot device.
234
235     Note that this is only valid for LVM disks, if we get passed
236     something else we raise an exception. The snapshot device can be
237     remove by calling the generic block device remove call.
238
239     """
240     cfbd = objects.Disk.FromDict(params[0])
241     return backend.BlockdevSnapshot(cfbd)
242
243   @staticmethod
244   def perspective_blockdev_grow(params):
245     """Grow a stack of devices.
246
247     """
248     cfbd = objects.Disk.FromDict(params[0])
249     amount = params[1]
250     return backend.BlockdevGrow(cfbd, amount)
251
252   @staticmethod
253   def perspective_blockdev_close(params):
254     """Closes the given block devices.
255
256     """
257     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
258     return backend.BlockdevClose(params[0], disks)
259
260   @staticmethod
261   def perspective_blockdev_getsize(params):
262     """Compute the sizes of the given block devices.
263
264     """
265     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
266     return backend.BlockdevGetsize(disks)
267
268   @staticmethod
269   def perspective_blockdev_export(params):
270     """Compute the sizes of the given block devices.
271
272     """
273     disk = objects.Disk.FromDict(params[0])
274     dest_node, dest_path, cluster_name = params[1:]
275     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
276
277   # blockdev/drbd specific methods ----------
278
279   @staticmethod
280   def perspective_drbd_disconnect_net(params):
281     """Disconnects the network connection of drbd disks.
282
283     Note that this is only valid for drbd disks, so the members of the
284     disk list must all be drbd devices.
285
286     """
287     nodes_ip, disks = params
288     disks = [objects.Disk.FromDict(cf) for cf in disks]
289     return backend.DrbdDisconnectNet(nodes_ip, disks)
290
291   @staticmethod
292   def perspective_drbd_attach_net(params):
293     """Attaches the network connection of drbd disks.
294
295     Note that this is only valid for drbd disks, so the members of the
296     disk list must all be drbd devices.
297
298     """
299     nodes_ip, disks, instance_name, multimaster = params
300     disks = [objects.Disk.FromDict(cf) for cf in disks]
301     return backend.DrbdAttachNet(nodes_ip, disks,
302                                      instance_name, multimaster)
303
304   @staticmethod
305   def perspective_drbd_wait_sync(params):
306     """Wait until DRBD disks are synched.
307
308     Note that this is only valid for drbd disks, so the members of the
309     disk list must all be drbd devices.
310
311     """
312     nodes_ip, disks = params
313     disks = [objects.Disk.FromDict(cf) for cf in disks]
314     return backend.DrbdWaitSync(nodes_ip, disks)
315
316   # export/import  --------------------------
317
318   @staticmethod
319   def perspective_snapshot_export(params):
320     """Export a given snapshot.
321
322     """
323     disk = objects.Disk.FromDict(params[0])
324     dest_node = params[1]
325     instance = objects.Instance.FromDict(params[2])
326     cluster_name = params[3]
327     dev_idx = params[4]
328     return backend.ExportSnapshot(disk, dest_node, instance,
329                                   cluster_name, dev_idx)
330
331   @staticmethod
332   def perspective_finalize_export(params):
333     """Expose the finalize export functionality.
334
335     """
336     instance = objects.Instance.FromDict(params[0])
337     snap_disks = [objects.Disk.FromDict(str_data)
338                   for str_data in params[1]]
339     return backend.FinalizeExport(instance, snap_disks)
340
341   @staticmethod
342   def perspective_export_info(params):
343     """Query information about an existing export on this node.
344
345     The given path may not contain an export, in which case we return
346     None.
347
348     """
349     path = params[0]
350     return backend.ExportInfo(path)
351
352   @staticmethod
353   def perspective_export_list(params):
354     """List the available exports on this node.
355
356     Note that as opposed to export_info, which may query data about an
357     export in any path, this only queries the standard Ganeti path
358     (constants.EXPORT_DIR).
359
360     """
361     return backend.ListExports()
362
363   @staticmethod
364   def perspective_export_remove(params):
365     """Remove an export.
366
367     """
368     export = params[0]
369     return backend.RemoveExport(export)
370
371   # volume  --------------------------
372
373   @staticmethod
374   def perspective_lv_list(params):
375     """Query the list of logical volumes in a given volume group.
376
377     """
378     vgname = params[0]
379     return backend.GetVolumeList(vgname)
380
381   @staticmethod
382   def perspective_vg_list(params):
383     """Query the list of volume groups.
384
385     """
386     return backend.ListVolumeGroups()
387
388   # Storage --------------------------
389
390   @staticmethod
391   def perspective_storage_list(params):
392     """Get list of storage units.
393
394     """
395     (su_name, su_args, name, fields) = params
396     return storage.GetStorage(su_name, *su_args).List(name, fields)
397
398   @staticmethod
399   def perspective_storage_modify(params):
400     """Modify a storage unit.
401
402     """
403     (su_name, su_args, name, changes) = params
404     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
405
406   @staticmethod
407   def perspective_storage_execute(params):
408     """Execute an operation on a storage unit.
409
410     """
411     (su_name, su_args, name, op) = params
412     return storage.GetStorage(su_name, *su_args).Execute(name, op)
413
414   # bridge  --------------------------
415
416   @staticmethod
417   def perspective_bridges_exist(params):
418     """Check if all bridges given exist on this node.
419
420     """
421     bridges_list = params[0]
422     return backend.BridgesExist(bridges_list)
423
424   # instance  --------------------------
425
426   @staticmethod
427   def perspective_instance_os_add(params):
428     """Install an OS on a given instance.
429
430     """
431     inst_s = params[0]
432     inst = objects.Instance.FromDict(inst_s)
433     reinstall = params[1]
434     return backend.InstanceOsAdd(inst, reinstall)
435
436   @staticmethod
437   def perspective_instance_run_rename(params):
438     """Runs the OS rename script for an instance.
439
440     """
441     inst_s, old_name = params
442     inst = objects.Instance.FromDict(inst_s)
443     return backend.RunRenameInstance(inst, old_name)
444
445   @staticmethod
446   def perspective_instance_os_import(params):
447     """Run the import function of an OS onto a given instance.
448
449     """
450     inst_s, src_node, src_images, cluster_name = params
451     inst = objects.Instance.FromDict(inst_s)
452     return backend.ImportOSIntoInstance(inst, src_node, src_images,
453                                         cluster_name)
454
455   @staticmethod
456   def perspective_instance_shutdown(params):
457     """Shutdown an instance.
458
459     """
460     instance = objects.Instance.FromDict(params[0])
461     timeout = params[1]
462     return backend.InstanceShutdown(instance, timeout)
463
464   @staticmethod
465   def perspective_instance_start(params):
466     """Start an instance.
467
468     """
469     instance = objects.Instance.FromDict(params[0])
470     return backend.StartInstance(instance)
471
472   @staticmethod
473   def perspective_migration_info(params):
474     """Gather information about an instance to be migrated.
475
476     """
477     instance = objects.Instance.FromDict(params[0])
478     return backend.MigrationInfo(instance)
479
480   @staticmethod
481   def perspective_accept_instance(params):
482     """Prepare the node to accept an instance.
483
484     """
485     instance, info, target = params
486     instance = objects.Instance.FromDict(instance)
487     return backend.AcceptInstance(instance, info, target)
488
489   @staticmethod
490   def perspective_finalize_migration(params):
491     """Finalize the instance migration.
492
493     """
494     instance, info, success = params
495     instance = objects.Instance.FromDict(instance)
496     return backend.FinalizeMigration(instance, info, success)
497
498   @staticmethod
499   def perspective_instance_migrate(params):
500     """Migrates an instance.
501
502     """
503     instance, target, live = params
504     instance = objects.Instance.FromDict(instance)
505     return backend.MigrateInstance(instance, target, live)
506
507   @staticmethod
508   def perspective_instance_reboot(params):
509     """Reboot an instance.
510
511     """
512     instance = objects.Instance.FromDict(params[0])
513     reboot_type = params[1]
514     shutdown_timeout = params[2]
515     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
516
517   @staticmethod
518   def perspective_instance_info(params):
519     """Query instance information.
520
521     """
522     return backend.GetInstanceInfo(params[0], params[1])
523
524   @staticmethod
525   def perspective_instance_migratable(params):
526     """Query whether the specified instance can be migrated.
527
528     """
529     instance = objects.Instance.FromDict(params[0])
530     return backend.GetInstanceMigratable(instance)
531
532   @staticmethod
533   def perspective_all_instances_info(params):
534     """Query information about all instances.
535
536     """
537     return backend.GetAllInstancesInfo(params[0])
538
539   @staticmethod
540   def perspective_instance_list(params):
541     """Query the list of running instances.
542
543     """
544     return backend.GetInstanceList(params[0])
545
546   # node --------------------------
547
548   @staticmethod
549   def perspective_node_tcp_ping(params):
550     """Do a TcpPing on the remote node.
551
552     """
553     return utils.TcpPing(params[1], params[2], timeout=params[3],
554                          live_port_needed=params[4], source=params[0])
555
556   @staticmethod
557   def perspective_node_has_ip_address(params):
558     """Checks if a node has the given ip address.
559
560     """
561     return utils.OwnIpAddress(params[0])
562
563   @staticmethod
564   def perspective_node_info(params):
565     """Query node information.
566
567     """
568     vgname, hypervisor_type = params
569     return backend.GetNodeInfo(vgname, hypervisor_type)
570
571   @staticmethod
572   def perspective_node_add(params):
573     """Complete the registration of this node in the cluster.
574
575     """
576     return backend.AddNode(params[0], params[1], params[2],
577                            params[3], params[4], params[5])
578
579   @staticmethod
580   def perspective_node_verify(params):
581     """Run a verify sequence on this node.
582
583     """
584     return backend.VerifyNode(params[0], params[1])
585
586   @staticmethod
587   def perspective_node_start_master(params):
588     """Promote this node to master status.
589
590     """
591     return backend.StartMaster(params[0], params[1])
592
593   @staticmethod
594   def perspective_node_stop_master(params):
595     """Demote this node from master status.
596
597     """
598     return backend.StopMaster(params[0])
599
600   @staticmethod
601   def perspective_node_leave_cluster(params):
602     """Cleanup after leaving a cluster.
603
604     """
605     return backend.LeaveCluster(params[0])
606
607   @staticmethod
608   def perspective_node_volumes(params):
609     """Query the list of all logical volume groups.
610
611     """
612     return backend.NodeVolumes()
613
614   @staticmethod
615   def perspective_node_demote_from_mc(params):
616     """Demote a node from the master candidate role.
617
618     """
619     return backend.DemoteFromMC()
620
621
622   @staticmethod
623   def perspective_node_powercycle(params):
624     """Tries to powercycle the nod.
625
626     """
627     hypervisor_type = params[0]
628     return backend.PowercycleNode(hypervisor_type)
629
630
631   # cluster --------------------------
632
633   @staticmethod
634   def perspective_version(params):
635     """Query version information.
636
637     """
638     return constants.PROTOCOL_VERSION
639
640   @staticmethod
641   def perspective_upload_file(params):
642     """Upload a file.
643
644     Note that the backend implementation imposes strict rules on which
645     files are accepted.
646
647     """
648     return backend.UploadFile(*params)
649
650   @staticmethod
651   def perspective_master_info(params):
652     """Query master information.
653
654     """
655     return backend.GetMasterInfo()
656
657   @staticmethod
658   def perspective_write_ssconf_files(params):
659     """Write ssconf files.
660
661     """
662     (values,) = params
663     return backend.WriteSsconfFiles(values)
664
665   # os -----------------------
666
667   @staticmethod
668   def perspective_os_diagnose(params):
669     """Query detailed information about existing OSes.
670
671     """
672     return backend.DiagnoseOS()
673
674   @staticmethod
675   def perspective_os_get(params):
676     """Query information about a given OS.
677
678     """
679     name = params[0]
680     os_obj = backend.OSFromDisk(name)
681     return os_obj.ToDict()
682
683   # hooks -----------------------
684
685   @staticmethod
686   def perspective_hooks_runner(params):
687     """Run hook scripts.
688
689     """
690     hpath, phase, env = params
691     hr = backend.HooksRunner()
692     return hr.RunHooks(hpath, phase, env)
693
694   # iallocator -----------------
695
696   @staticmethod
697   def perspective_iallocator_runner(params):
698     """Run an iallocator script.
699
700     """
701     name, idata = params
702     iar = backend.IAllocatorRunner()
703     return iar.Run(name, idata)
704
705   # test -----------------------
706
707   @staticmethod
708   def perspective_test_delay(params):
709     """Run test delay.
710
711     """
712     duration = params[0]
713     status, rval = utils.TestDelay(duration)
714     if not status:
715       raise backend.RPCFail(rval)
716     return rval
717
718   # file storage ---------------
719
720   @staticmethod
721   def perspective_file_storage_dir_create(params):
722     """Create the file storage directory.
723
724     """
725     file_storage_dir = params[0]
726     return backend.CreateFileStorageDir(file_storage_dir)
727
728   @staticmethod
729   def perspective_file_storage_dir_remove(params):
730     """Remove the file storage directory.
731
732     """
733     file_storage_dir = params[0]
734     return backend.RemoveFileStorageDir(file_storage_dir)
735
736   @staticmethod
737   def perspective_file_storage_dir_rename(params):
738     """Rename the file storage directory.
739
740     """
741     old_file_storage_dir = params[0]
742     new_file_storage_dir = params[1]
743     return backend.RenameFileStorageDir(old_file_storage_dir,
744                                         new_file_storage_dir)
745
746   # jobs ------------------------
747
748   @staticmethod
749   @_RequireJobQueueLock
750   def perspective_jobqueue_update(params):
751     """Update job queue.
752
753     """
754     (file_name, content) = params
755     return backend.JobQueueUpdate(file_name, content)
756
757   @staticmethod
758   @_RequireJobQueueLock
759   def perspective_jobqueue_purge(params):
760     """Purge job queue.
761
762     """
763     return backend.JobQueuePurge()
764
765   @staticmethod
766   @_RequireJobQueueLock
767   def perspective_jobqueue_rename(params):
768     """Rename a job queue file.
769
770     """
771     # TODO: What if a file fails to rename?
772     return [backend.JobQueueRename(old, new) for old, new in params]
773
774   @staticmethod
775   def perspective_jobqueue_set_drain(params):
776     """Set/unset the queue drain flag.
777
778     """
779     drain_flag = params[0]
780     return backend.JobQueueSetDrainFlag(drain_flag)
781
782
783   # hypervisor ---------------
784
785   @staticmethod
786   def perspective_hypervisor_validate_params(params):
787     """Validate the hypervisor parameters.
788
789     """
790     (hvname, hvparams) = params
791     return backend.ValidateHVParams(hvname, hvparams)
792
793
794 def CheckNoded(_, args):
795   """Initial checks whether to run or exit with a failure.
796
797   """
798   if args: # noded doesn't take any arguments
799     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
800                           sys.argv[0])
801     sys.exit(constants.EXIT_FAILURE)
802
803
804 def ExecNoded(options, _):
805   """Main node daemon function, executed with the PID file held.
806
807   """
808   global queue_lock # pylint: disable-msg=W0603
809
810   # Read SSL certificate
811   if options.ssl:
812     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
813                                     ssl_cert_path=options.ssl_cert)
814   else:
815     ssl_params = None
816
817   # Prepare job queue
818   queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
819
820   mainloop = daemon.Mainloop()
821   server = NodeHttpServer(mainloop, options.bind_address, options.port,
822                           ssl_params=ssl_params, ssl_verify_peer=True)
823   server.Start()
824   try:
825     mainloop.Run()
826   finally:
827     server.Stop()
828
829
830 def main():
831   """Main function for the node daemon.
832
833   """
834   parser = OptionParser(description="Ganeti node daemon",
835                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
836                         version="%%prog (ganeti) %s" %
837                         constants.RELEASE_VERSION)
838   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
839   dirs.append((constants.LOG_OS_DIR, 0750))
840   dirs.append((constants.LOCK_DIR, 1777))
841   daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
842                      default_ssl_cert=constants.SSL_CERT_FILE,
843                      default_ssl_key=constants.SSL_CERT_FILE)
844
845
846 if __name__ == '__main__':
847   main()