daemon.GenericMain: Don't use dict for SSL paths, improve CLI options
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import SocketServer
30 import logging
31 import signal
32
33 from optparse import OptionParser
34
35 from ganeti import backend
36 from ganeti import constants
37 from ganeti import objects
38 from ganeti import errors
39 from ganeti import jstore
40 from ganeti import daemon
41 from ganeti import http
42 from ganeti import utils
43 from ganeti import storage
44
45 import ganeti.http.server
46
47
48 queue_lock = None
49
50
51 def _RequireJobQueueLock(fn):
52   """Decorator for job queue manipulating functions.
53
54   """
55   QUEUE_LOCK_TIMEOUT = 10
56
57   def wrapper(*args, **kwargs):
58     # Locking in exclusive, blocking mode because there could be several
59     # children running at the same time. Waiting up to 10 seconds.
60     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
61     try:
62       return fn(*args, **kwargs)
63     finally:
64       queue_lock.Unlock()
65
66   return wrapper
67
68
69 class NodeHttpServer(http.server.HttpServer):
70   """The server implementation.
71
72   This class holds all methods exposed over the RPC interface.
73
74   """
75   def __init__(self, *args, **kwargs):
76     http.server.HttpServer.__init__(self, *args, **kwargs)
77     self.noded_pid = os.getpid()
78
79   def HandleRequest(self, req):
80     """Handle a request.
81
82     """
83     if req.request_method.upper() != http.HTTP_PUT:
84       raise http.HttpBadRequest()
85
86     path = req.request_path
87     if path.startswith("/"):
88       path = path[1:]
89
90     method = getattr(self, "perspective_%s" % path, None)
91     if method is None:
92       raise http.HttpNotFound()
93
94     try:
95       rvalue = method(req.request_body)
96       return True, rvalue
97
98     except backend.RPCFail, err:
99       # our custom failure exception; str(err) works fine if the
100       # exception was constructed with a single argument, and in
101       # this case, err.message == err.args[0] == str(err)
102       return (False, str(err))
103     except errors.QuitGanetiException, err:
104       # Tell parent to quit
105       logging.info("Shutting down the node daemon, arguments: %s",
106                    str(err.args))
107       os.kill(self.noded_pid, signal.SIGTERM)
108       # And return the error's arguments, which must be already in
109       # correct tuple format
110       return err.args
111     except Exception, err:
112       logging.exception("Error in RPC call")
113       return False, "Error while executing backend function: %s" % str(err)
114
115   # the new block devices  --------------------------
116
117   @staticmethod
118   def perspective_blockdev_create(params):
119     """Create a block device.
120
121     """
122     bdev_s, size, owner, on_primary, info = params
123     bdev = objects.Disk.FromDict(bdev_s)
124     if bdev is None:
125       raise ValueError("can't unserialize data!")
126     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
127
128   @staticmethod
129   def perspective_blockdev_remove(params):
130     """Remove a block device.
131
132     """
133     bdev_s = params[0]
134     bdev = objects.Disk.FromDict(bdev_s)
135     return backend.BlockdevRemove(bdev)
136
137   @staticmethod
138   def perspective_blockdev_rename(params):
139     """Remove a block device.
140
141     """
142     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
143     return backend.BlockdevRename(devlist)
144
145   @staticmethod
146   def perspective_blockdev_assemble(params):
147     """Assemble a block device.
148
149     """
150     bdev_s, owner, on_primary = params
151     bdev = objects.Disk.FromDict(bdev_s)
152     if bdev is None:
153       raise ValueError("can't unserialize data!")
154     return backend.BlockdevAssemble(bdev, owner, on_primary)
155
156   @staticmethod
157   def perspective_blockdev_shutdown(params):
158     """Shutdown a block device.
159
160     """
161     bdev_s = params[0]
162     bdev = objects.Disk.FromDict(bdev_s)
163     if bdev is None:
164       raise ValueError("can't unserialize data!")
165     return backend.BlockdevShutdown(bdev)
166
167   @staticmethod
168   def perspective_blockdev_addchildren(params):
169     """Add a child to a mirror device.
170
171     Note: this is only valid for mirror devices. It's the caller's duty
172     to send a correct disk, otherwise we raise an error.
173
174     """
175     bdev_s, ndev_s = params
176     bdev = objects.Disk.FromDict(bdev_s)
177     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
178     if bdev is None or ndevs.count(None) > 0:
179       raise ValueError("can't unserialize data!")
180     return backend.BlockdevAddchildren(bdev, ndevs)
181
182   @staticmethod
183   def perspective_blockdev_removechildren(params):
184     """Remove a child from a mirror device.
185
186     This is only valid for mirror devices, of course. It's the callers
187     duty to send a correct disk, otherwise we raise an error.
188
189     """
190     bdev_s, ndev_s = params
191     bdev = objects.Disk.FromDict(bdev_s)
192     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
193     if bdev is None or ndevs.count(None) > 0:
194       raise ValueError("can't unserialize data!")
195     return backend.BlockdevRemovechildren(bdev, ndevs)
196
197   @staticmethod
198   def perspective_blockdev_getmirrorstatus(params):
199     """Return the mirror status for a list of disks.
200
201     """
202     disks = [objects.Disk.FromDict(dsk_s)
203              for dsk_s in params]
204     return [status.ToDict()
205             for status in backend.BlockdevGetmirrorstatus(disks)]
206
207   @staticmethod
208   def perspective_blockdev_find(params):
209     """Expose the FindBlockDevice functionality for a disk.
210
211     This will try to find but not activate a disk.
212
213     """
214     disk = objects.Disk.FromDict(params[0])
215
216     result = backend.BlockdevFind(disk)
217     if result is None:
218       return None
219
220     return result.ToDict()
221
222   @staticmethod
223   def perspective_blockdev_snapshot(params):
224     """Create a snapshot device.
225
226     Note that this is only valid for LVM disks, if we get passed
227     something else we raise an exception. The snapshot device can be
228     remove by calling the generic block device remove call.
229
230     """
231     cfbd = objects.Disk.FromDict(params[0])
232     return backend.BlockdevSnapshot(cfbd)
233
234   @staticmethod
235   def perspective_blockdev_grow(params):
236     """Grow a stack of devices.
237
238     """
239     cfbd = objects.Disk.FromDict(params[0])
240     amount = params[1]
241     return backend.BlockdevGrow(cfbd, amount)
242
243   @staticmethod
244   def perspective_blockdev_close(params):
245     """Closes the given block devices.
246
247     """
248     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
249     return backend.BlockdevClose(params[0], disks)
250
251   @staticmethod
252   def perspective_blockdev_getsize(params):
253     """Compute the sizes of the given block devices.
254
255     """
256     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
257     return backend.BlockdevGetsize(disks)
258
259   @staticmethod
260   def perspective_blockdev_export(params):
261     """Compute the sizes of the given block devices.
262
263     """
264     disk = objects.Disk.FromDict(params[0])
265     dest_node, dest_path, cluster_name = params[1:]
266     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
267
268   # blockdev/drbd specific methods ----------
269
270   @staticmethod
271   def perspective_drbd_disconnect_net(params):
272     """Disconnects the network connection of drbd disks.
273
274     Note that this is only valid for drbd disks, so the members of the
275     disk list must all be drbd devices.
276
277     """
278     nodes_ip, disks = params
279     disks = [objects.Disk.FromDict(cf) for cf in disks]
280     return backend.DrbdDisconnectNet(nodes_ip, disks)
281
282   @staticmethod
283   def perspective_drbd_attach_net(params):
284     """Attaches the network connection of drbd disks.
285
286     Note that this is only valid for drbd disks, so the members of the
287     disk list must all be drbd devices.
288
289     """
290     nodes_ip, disks, instance_name, multimaster = params
291     disks = [objects.Disk.FromDict(cf) for cf in disks]
292     return backend.DrbdAttachNet(nodes_ip, disks,
293                                      instance_name, multimaster)
294
295   @staticmethod
296   def perspective_drbd_wait_sync(params):
297     """Wait until DRBD disks are synched.
298
299     Note that this is only valid for drbd disks, so the members of the
300     disk list must all be drbd devices.
301
302     """
303     nodes_ip, disks = params
304     disks = [objects.Disk.FromDict(cf) for cf in disks]
305     return backend.DrbdWaitSync(nodes_ip, disks)
306
307   # export/import  --------------------------
308
309   @staticmethod
310   def perspective_snapshot_export(params):
311     """Export a given snapshot.
312
313     """
314     disk = objects.Disk.FromDict(params[0])
315     dest_node = params[1]
316     instance = objects.Instance.FromDict(params[2])
317     cluster_name = params[3]
318     dev_idx = params[4]
319     return backend.ExportSnapshot(disk, dest_node, instance,
320                                   cluster_name, dev_idx)
321
322   @staticmethod
323   def perspective_finalize_export(params):
324     """Expose the finalize export functionality.
325
326     """
327     instance = objects.Instance.FromDict(params[0])
328     snap_disks = [objects.Disk.FromDict(str_data)
329                   for str_data in params[1]]
330     return backend.FinalizeExport(instance, snap_disks)
331
332   @staticmethod
333   def perspective_export_info(params):
334     """Query information about an existing export on this node.
335
336     The given path may not contain an export, in which case we return
337     None.
338
339     """
340     path = params[0]
341     return backend.ExportInfo(path)
342
343   @staticmethod
344   def perspective_export_list(params):
345     """List the available exports on this node.
346
347     Note that as opposed to export_info, which may query data about an
348     export in any path, this only queries the standard Ganeti path
349     (constants.EXPORT_DIR).
350
351     """
352     return backend.ListExports()
353
354   @staticmethod
355   def perspective_export_remove(params):
356     """Remove an export.
357
358     """
359     export = params[0]
360     return backend.RemoveExport(export)
361
362   # volume  --------------------------
363
364   @staticmethod
365   def perspective_lv_list(params):
366     """Query the list of logical volumes in a given volume group.
367
368     """
369     vgname = params[0]
370     return backend.GetVolumeList(vgname)
371
372   @staticmethod
373   def perspective_vg_list(params):
374     """Query the list of volume groups.
375
376     """
377     return backend.ListVolumeGroups()
378
379   # Storage --------------------------
380
381   @staticmethod
382   def perspective_storage_list(params):
383     """Get list of storage units.
384
385     """
386     (su_name, su_args, name, fields) = params
387     return storage.GetStorage(su_name, *su_args).List(name, fields)
388
389   @staticmethod
390   def perspective_storage_modify(params):
391     """Modify a storage unit.
392
393     """
394     (su_name, su_args, name, changes) = params
395     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
396
397   @staticmethod
398   def perspective_storage_execute(params):
399     """Execute an operation on a storage unit.
400
401     """
402     (su_name, su_args, name, op) = params
403     return storage.GetStorage(su_name, *su_args).Execute(name, op)
404
405   # bridge  --------------------------
406
407   @staticmethod
408   def perspective_bridges_exist(params):
409     """Check if all bridges given exist on this node.
410
411     """
412     bridges_list = params[0]
413     return backend.BridgesExist(bridges_list)
414
415   # instance  --------------------------
416
417   @staticmethod
418   def perspective_instance_os_add(params):
419     """Install an OS on a given instance.
420
421     """
422     inst_s = params[0]
423     inst = objects.Instance.FromDict(inst_s)
424     reinstall = params[1]
425     return backend.InstanceOsAdd(inst, reinstall)
426
427   @staticmethod
428   def perspective_instance_run_rename(params):
429     """Runs the OS rename script for an instance.
430
431     """
432     inst_s, old_name = params
433     inst = objects.Instance.FromDict(inst_s)
434     return backend.RunRenameInstance(inst, old_name)
435
436   @staticmethod
437   def perspective_instance_os_import(params):
438     """Run the import function of an OS onto a given instance.
439
440     """
441     inst_s, src_node, src_images, cluster_name = params
442     inst = objects.Instance.FromDict(inst_s)
443     return backend.ImportOSIntoInstance(inst, src_node, src_images,
444                                         cluster_name)
445
446   @staticmethod
447   def perspective_instance_shutdown(params):
448     """Shutdown an instance.
449
450     """
451     instance = objects.Instance.FromDict(params[0])
452     timeout = params[1]
453     return backend.InstanceShutdown(instance, timeout)
454
455   @staticmethod
456   def perspective_instance_start(params):
457     """Start an instance.
458
459     """
460     instance = objects.Instance.FromDict(params[0])
461     return backend.StartInstance(instance)
462
463   @staticmethod
464   def perspective_migration_info(params):
465     """Gather information about an instance to be migrated.
466
467     """
468     instance = objects.Instance.FromDict(params[0])
469     return backend.MigrationInfo(instance)
470
471   @staticmethod
472   def perspective_accept_instance(params):
473     """Prepare the node to accept an instance.
474
475     """
476     instance, info, target = params
477     instance = objects.Instance.FromDict(instance)
478     return backend.AcceptInstance(instance, info, target)
479
480   @staticmethod
481   def perspective_finalize_migration(params):
482     """Finalize the instance migration.
483
484     """
485     instance, info, success = params
486     instance = objects.Instance.FromDict(instance)
487     return backend.FinalizeMigration(instance, info, success)
488
489   @staticmethod
490   def perspective_instance_migrate(params):
491     """Migrates an instance.
492
493     """
494     instance, target, live = params
495     instance = objects.Instance.FromDict(instance)
496     return backend.MigrateInstance(instance, target, live)
497
498   @staticmethod
499   def perspective_instance_reboot(params):
500     """Reboot an instance.
501
502     """
503     instance = objects.Instance.FromDict(params[0])
504     reboot_type = params[1]
505     shutdown_timeout = params[2]
506     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
507
508   @staticmethod
509   def perspective_instance_info(params):
510     """Query instance information.
511
512     """
513     return backend.GetInstanceInfo(params[0], params[1])
514
515   @staticmethod
516   def perspective_instance_migratable(params):
517     """Query whether the specified instance can be migrated.
518
519     """
520     instance = objects.Instance.FromDict(params[0])
521     return backend.GetInstanceMigratable(instance)
522
523   @staticmethod
524   def perspective_all_instances_info(params):
525     """Query information about all instances.
526
527     """
528     return backend.GetAllInstancesInfo(params[0])
529
530   @staticmethod
531   def perspective_instance_list(params):
532     """Query the list of running instances.
533
534     """
535     return backend.GetInstanceList(params[0])
536
537   # node --------------------------
538
539   @staticmethod
540   def perspective_node_tcp_ping(params):
541     """Do a TcpPing on the remote node.
542
543     """
544     return utils.TcpPing(params[1], params[2], timeout=params[3],
545                          live_port_needed=params[4], source=params[0])
546
547   @staticmethod
548   def perspective_node_has_ip_address(params):
549     """Checks if a node has the given ip address.
550
551     """
552     return utils.OwnIpAddress(params[0])
553
554   @staticmethod
555   def perspective_node_info(params):
556     """Query node information.
557
558     """
559     vgname, hypervisor_type = params
560     return backend.GetNodeInfo(vgname, hypervisor_type)
561
562   @staticmethod
563   def perspective_node_add(params):
564     """Complete the registration of this node in the cluster.
565
566     """
567     return backend.AddNode(params[0], params[1], params[2],
568                            params[3], params[4], params[5])
569
570   @staticmethod
571   def perspective_node_verify(params):
572     """Run a verify sequence on this node.
573
574     """
575     return backend.VerifyNode(params[0], params[1])
576
577   @staticmethod
578   def perspective_node_start_master(params):
579     """Promote this node to master status.
580
581     """
582     return backend.StartMaster(params[0], params[1])
583
584   @staticmethod
585   def perspective_node_stop_master(params):
586     """Demote this node from master status.
587
588     """
589     return backend.StopMaster(params[0])
590
591   @staticmethod
592   def perspective_node_leave_cluster(params):
593     """Cleanup after leaving a cluster.
594
595     """
596     return backend.LeaveCluster(params[0])
597
598   @staticmethod
599   def perspective_node_volumes(params):
600     """Query the list of all logical volume groups.
601
602     """
603     return backend.NodeVolumes()
604
605   @staticmethod
606   def perspective_node_demote_from_mc(params):
607     """Demote a node from the master candidate role.
608
609     """
610     return backend.DemoteFromMC()
611
612
613   @staticmethod
614   def perspective_node_powercycle(params):
615     """Tries to powercycle the nod.
616
617     """
618     hypervisor_type = params[0]
619     return backend.PowercycleNode(hypervisor_type)
620
621
622   # cluster --------------------------
623
624   @staticmethod
625   def perspective_version(params):
626     """Query version information.
627
628     """
629     return constants.PROTOCOL_VERSION
630
631   @staticmethod
632   def perspective_upload_file(params):
633     """Upload a file.
634
635     Note that the backend implementation imposes strict rules on which
636     files are accepted.
637
638     """
639     return backend.UploadFile(*params)
640
641   @staticmethod
642   def perspective_master_info(params):
643     """Query master information.
644
645     """
646     return backend.GetMasterInfo()
647
648   @staticmethod
649   def perspective_write_ssconf_files(params):
650     """Write ssconf files.
651
652     """
653     (values,) = params
654     return backend.WriteSsconfFiles(values)
655
656   # os -----------------------
657
658   @staticmethod
659   def perspective_os_diagnose(params):
660     """Query detailed information about existing OSes.
661
662     """
663     return backend.DiagnoseOS()
664
665   @staticmethod
666   def perspective_os_get(params):
667     """Query information about a given OS.
668
669     """
670     name = params[0]
671     os_obj = backend.OSFromDisk(name)
672     return os_obj.ToDict()
673
674   # hooks -----------------------
675
676   @staticmethod
677   def perspective_hooks_runner(params):
678     """Run hook scripts.
679
680     """
681     hpath, phase, env = params
682     hr = backend.HooksRunner()
683     return hr.RunHooks(hpath, phase, env)
684
685   # iallocator -----------------
686
687   @staticmethod
688   def perspective_iallocator_runner(params):
689     """Run an iallocator script.
690
691     """
692     name, idata = params
693     iar = backend.IAllocatorRunner()
694     return iar.Run(name, idata)
695
696   # test -----------------------
697
698   @staticmethod
699   def perspective_test_delay(params):
700     """Run test delay.
701
702     """
703     duration = params[0]
704     status, rval = utils.TestDelay(duration)
705     if not status:
706       raise backend.RPCFail(rval)
707     return rval
708
709   # file storage ---------------
710
711   @staticmethod
712   def perspective_file_storage_dir_create(params):
713     """Create the file storage directory.
714
715     """
716     file_storage_dir = params[0]
717     return backend.CreateFileStorageDir(file_storage_dir)
718
719   @staticmethod
720   def perspective_file_storage_dir_remove(params):
721     """Remove the file storage directory.
722
723     """
724     file_storage_dir = params[0]
725     return backend.RemoveFileStorageDir(file_storage_dir)
726
727   @staticmethod
728   def perspective_file_storage_dir_rename(params):
729     """Rename the file storage directory.
730
731     """
732     old_file_storage_dir = params[0]
733     new_file_storage_dir = params[1]
734     return backend.RenameFileStorageDir(old_file_storage_dir,
735                                         new_file_storage_dir)
736
737   # jobs ------------------------
738
739   @staticmethod
740   @_RequireJobQueueLock
741   def perspective_jobqueue_update(params):
742     """Update job queue.
743
744     """
745     (file_name, content) = params
746     return backend.JobQueueUpdate(file_name, content)
747
748   @staticmethod
749   @_RequireJobQueueLock
750   def perspective_jobqueue_purge(params):
751     """Purge job queue.
752
753     """
754     return backend.JobQueuePurge()
755
756   @staticmethod
757   @_RequireJobQueueLock
758   def perspective_jobqueue_rename(params):
759     """Rename a job queue file.
760
761     """
762     # TODO: What if a file fails to rename?
763     return [backend.JobQueueRename(old, new) for old, new in params]
764
765   @staticmethod
766   def perspective_jobqueue_set_drain(params):
767     """Set/unset the queue drain flag.
768
769     """
770     drain_flag = params[0]
771     return backend.JobQueueSetDrainFlag(drain_flag)
772
773
774   # hypervisor ---------------
775
776   @staticmethod
777   def perspective_hypervisor_validate_params(params):
778     """Validate the hypervisor parameters.
779
780     """
781     (hvname, hvparams) = params
782     return backend.ValidateHVParams(hvname, hvparams)
783
784
785 def ExecNoded(options, args):
786   """Main node daemon function, executed with the PID file held.
787
788   """
789   global queue_lock
790
791   # Read SSL certificate
792   if options.ssl:
793     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
794                                     ssl_cert_path=options.ssl_cert)
795   else:
796     ssl_params = None
797
798   # Prepare job queue
799   queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
800
801   mainloop = daemon.Mainloop()
802   server = NodeHttpServer(mainloop, options.bind_address, options.port,
803                           ssl_params=ssl_params, ssl_verify_peer=True)
804   server.Start()
805   try:
806     mainloop.Run()
807   finally:
808     server.Stop()
809
810
811 def main():
812   """Main function for the node daemon.
813
814   """
815   parser = OptionParser(description="Ganeti node daemon",
816                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
817                         version="%%prog (ganeti) %s" %
818                         constants.RELEASE_VERSION)
819   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
820   dirs.append((constants.LOG_OS_DIR, 0750))
821   dirs.append((constants.LOCK_DIR, 1777))
822   daemon.GenericMain(constants.NODED, parser, dirs, None, ExecNoded,
823                      default_ssl_cert=constants.SSL_CERT_FILE,
824                      default_ssl_key=constants.SSL_CERT_FILE)
825
826
827 if __name__ == '__main__':
828   main()