1367f226ef133fcf33c1acb069265ff9ffcce28c
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import SocketServer
31 import errno
32 import logging
33 import signal
34
35 from optparse import OptionParser
36
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
45
46 import ganeti.http.server
47
48
49 queue_lock = None
50
51
52 def _RequireJobQueueLock(fn):
53   """Decorator for job queue manipulating functions.
54
55   """
56   QUEUE_LOCK_TIMEOUT = 10
57
58   def wrapper(*args, **kwargs):
59     # Locking in exclusive, blocking mode because there could be several
60     # children running at the same time. Waiting up to 10 seconds.
61     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
62     try:
63       return fn(*args, **kwargs)
64     finally:
65       queue_lock.Unlock()
66
67   return wrapper
68
69
70 class NodeHttpServer(http.server.HttpServer):
71   """The server implementation.
72
73   This class holds all methods exposed over the RPC interface.
74
75   """
76   def __init__(self, *args, **kwargs):
77     http.server.HttpServer.__init__(self, *args, **kwargs)
78     self.noded_pid = os.getpid()
79
80   def HandleRequest(self, req):
81     """Handle a request.
82
83     """
84     if req.request_method.upper() != http.HTTP_PUT:
85       raise http.HttpBadRequest()
86
87     path = req.request_path
88     if path.startswith("/"):
89       path = path[1:]
90
91     method = getattr(self, "perspective_%s" % path, None)
92     if method is None:
93       raise http.HttpNotFound()
94
95     try:
96       try:
97         return method(req.request_body)
98       except backend.RPCFail, err:
99         # our custom failure exception; str(err) works fine if the
100         # exception was constructed with a single argument, and in
101         # this case, err.message == err.args[0] == str(err)
102         return (False, str(err))
103       except:
104         logging.exception("Error in RPC call")
105         raise
106     except errors.QuitGanetiException, err:
107       # Tell parent to quit
108       os.kill(self.noded_pid, signal.SIGTERM)
109
110   # the new block devices  --------------------------
111
112   @staticmethod
113   def perspective_blockdev_create(params):
114     """Create a block device.
115
116     """
117     bdev_s, size, owner, on_primary, info = params
118     bdev = objects.Disk.FromDict(bdev_s)
119     if bdev is None:
120       raise ValueError("can't unserialize data!")
121     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
122
123   @staticmethod
124   def perspective_blockdev_remove(params):
125     """Remove a block device.
126
127     """
128     bdev_s = params[0]
129     bdev = objects.Disk.FromDict(bdev_s)
130     return backend.BlockdevRemove(bdev)
131
132   @staticmethod
133   def perspective_blockdev_rename(params):
134     """Remove a block device.
135
136     """
137     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
138     return backend.BlockdevRename(devlist)
139
140   @staticmethod
141   def perspective_blockdev_assemble(params):
142     """Assemble a block device.
143
144     """
145     bdev_s, owner, on_primary = params
146     bdev = objects.Disk.FromDict(bdev_s)
147     if bdev is None:
148       raise ValueError("can't unserialize data!")
149     return backend.BlockdevAssemble(bdev, owner, on_primary)
150
151   @staticmethod
152   def perspective_blockdev_shutdown(params):
153     """Shutdown a block device.
154
155     """
156     bdev_s = params[0]
157     bdev = objects.Disk.FromDict(bdev_s)
158     if bdev is None:
159       raise ValueError("can't unserialize data!")
160     return backend.BlockdevShutdown(bdev)
161
162   @staticmethod
163   def perspective_blockdev_addchildren(params):
164     """Add a child to a mirror device.
165
166     Note: this is only valid for mirror devices. It's the caller's duty
167     to send a correct disk, otherwise we raise an error.
168
169     """
170     bdev_s, ndev_s = params
171     bdev = objects.Disk.FromDict(bdev_s)
172     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
173     if bdev is None or ndevs.count(None) > 0:
174       raise ValueError("can't unserialize data!")
175     return backend.BlockdevAddchildren(bdev, ndevs)
176
177   @staticmethod
178   def perspective_blockdev_removechildren(params):
179     """Remove a child from a mirror device.
180
181     This is only valid for mirror devices, of course. It's the callers
182     duty to send a correct disk, otherwise we raise an error.
183
184     """
185     bdev_s, ndev_s = params
186     bdev = objects.Disk.FromDict(bdev_s)
187     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
188     if bdev is None or ndevs.count(None) > 0:
189       raise ValueError("can't unserialize data!")
190     return backend.BlockdevRemovechildren(bdev, ndevs)
191
192   @staticmethod
193   def perspective_blockdev_getmirrorstatus(params):
194     """Return the mirror status for a list of disks.
195
196     """
197     disks = [objects.Disk.FromDict(dsk_s)
198             for dsk_s in params]
199     return backend.BlockdevGetmirrorstatus(disks)
200
201   @staticmethod
202   def perspective_blockdev_find(params):
203     """Expose the FindBlockDevice functionality for a disk.
204
205     This will try to find but not activate a disk.
206
207     """
208     disk = objects.Disk.FromDict(params[0])
209     return backend.BlockdevFind(disk)
210
211   @staticmethod
212   def perspective_blockdev_snapshot(params):
213     """Create a snapshot device.
214
215     Note that this is only valid for LVM disks, if we get passed
216     something else we raise an exception. The snapshot device can be
217     remove by calling the generic block device remove call.
218
219     """
220     cfbd = objects.Disk.FromDict(params[0])
221     return backend.BlockdevSnapshot(cfbd)
222
223   @staticmethod
224   def perspective_blockdev_grow(params):
225     """Grow a stack of devices.
226
227     """
228     cfbd = objects.Disk.FromDict(params[0])
229     amount = params[1]
230     return backend.BlockdevGrow(cfbd, amount)
231
232   @staticmethod
233   def perspective_blockdev_close(params):
234     """Closes the given block devices.
235
236     """
237     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
238     return backend.BlockdevClose(params[0], disks)
239
240   # blockdev/drbd specific methods ----------
241
242   @staticmethod
243   def perspective_drbd_disconnect_net(params):
244     """Disconnects the network connection of drbd disks.
245
246     Note that this is only valid for drbd disks, so the members of the
247     disk list must all be drbd devices.
248
249     """
250     nodes_ip, disks = params
251     disks = [objects.Disk.FromDict(cf) for cf in disks]
252     return backend.DrbdDisconnectNet(nodes_ip, disks)
253
254   @staticmethod
255   def perspective_drbd_attach_net(params):
256     """Attaches the network connection of drbd disks.
257
258     Note that this is only valid for drbd disks, so the members of the
259     disk list must all be drbd devices.
260
261     """
262     nodes_ip, disks, instance_name, multimaster = params
263     disks = [objects.Disk.FromDict(cf) for cf in disks]
264     return backend.DrbdAttachNet(nodes_ip, disks,
265                                      instance_name, multimaster)
266
267   @staticmethod
268   def perspective_drbd_wait_sync(params):
269     """Wait until DRBD disks are synched.
270
271     Note that this is only valid for drbd disks, so the members of the
272     disk list must all be drbd devices.
273
274     """
275     nodes_ip, disks = params
276     disks = [objects.Disk.FromDict(cf) for cf in disks]
277     return backend.DrbdWaitSync(nodes_ip, disks)
278
279   # export/import  --------------------------
280
281   @staticmethod
282   def perspective_snapshot_export(params):
283     """Export a given snapshot.
284
285     """
286     disk = objects.Disk.FromDict(params[0])
287     dest_node = params[1]
288     instance = objects.Instance.FromDict(params[2])
289     cluster_name = params[3]
290     dev_idx = params[4]
291     return backend.ExportSnapshot(disk, dest_node, instance,
292                                   cluster_name, dev_idx)
293
294   @staticmethod
295   def perspective_finalize_export(params):
296     """Expose the finalize export functionality.
297
298     """
299     instance = objects.Instance.FromDict(params[0])
300     snap_disks = [objects.Disk.FromDict(str_data)
301                   for str_data in params[1]]
302     return backend.FinalizeExport(instance, snap_disks)
303
304   @staticmethod
305   def perspective_export_info(params):
306     """Query information about an existing export on this node.
307
308     The given path may not contain an export, in which case we return
309     None.
310
311     """
312     path = params[0]
313     einfo = backend.ExportInfo(path)
314     if einfo is None:
315       return einfo
316     return einfo.Dumps()
317
318   @staticmethod
319   def perspective_export_list(params):
320     """List the available exports on this node.
321
322     Note that as opposed to export_info, which may query data about an
323     export in any path, this only queries the standard Ganeti path
324     (constants.EXPORT_DIR).
325
326     """
327     return backend.ListExports()
328
329   @staticmethod
330   def perspective_export_remove(params):
331     """Remove an export.
332
333     """
334     export = params[0]
335     return backend.RemoveExport(export)
336
337   # volume  --------------------------
338
339   @staticmethod
340   def perspective_volume_list(params):
341     """Query the list of logical volumes in a given volume group.
342
343     """
344     vgname = params[0]
345     return backend.GetVolumeList(vgname)
346
347   @staticmethod
348   def perspective_vg_list(params):
349     """Query the list of volume groups.
350
351     """
352     return backend.ListVolumeGroups()
353
354   # bridge  --------------------------
355
356   @staticmethod
357   def perspective_bridges_exist(params):
358     """Check if all bridges given exist on this node.
359
360     """
361     bridges_list = params[0]
362     return backend.BridgesExist(bridges_list)
363
364   # instance  --------------------------
365
366   @staticmethod
367   def perspective_instance_os_add(params):
368     """Install an OS on a given instance.
369
370     """
371     inst_s = params[0]
372     inst = objects.Instance.FromDict(inst_s)
373     reinstall = params[1]
374     return backend.InstanceOsAdd(inst, reinstall)
375
376   @staticmethod
377   def perspective_instance_run_rename(params):
378     """Runs the OS rename script for an instance.
379
380     """
381     inst_s, old_name = params
382     inst = objects.Instance.FromDict(inst_s)
383     return backend.RunRenameInstance(inst, old_name)
384
385   @staticmethod
386   def perspective_instance_os_import(params):
387     """Run the import function of an OS onto a given instance.
388
389     """
390     inst_s, src_node, src_images, cluster_name = params
391     inst = objects.Instance.FromDict(inst_s)
392     return backend.ImportOSIntoInstance(inst, src_node, src_images,
393                                         cluster_name)
394
395   @staticmethod
396   def perspective_instance_shutdown(params):
397     """Shutdown an instance.
398
399     """
400     instance = objects.Instance.FromDict(params[0])
401     return backend.InstanceShutdown(instance)
402
403   @staticmethod
404   def perspective_instance_start(params):
405     """Start an instance.
406
407     """
408     instance = objects.Instance.FromDict(params[0])
409     return backend.StartInstance(instance)
410
411   @staticmethod
412   def perspective_migration_info(params):
413     """Gather information about an instance to be migrated.
414
415     """
416     instance = objects.Instance.FromDict(params[0])
417     return backend.MigrationInfo(instance)
418
419   @staticmethod
420   def perspective_accept_instance(params):
421     """Prepare the node to accept an instance.
422
423     """
424     instance, info, target = params
425     instance = objects.Instance.FromDict(instance)
426     return backend.AcceptInstance(instance, info, target)
427
428   @staticmethod
429   def perspective_finalize_migration(params):
430     """Finalize the instance migration.
431
432     """
433     instance, info, success = params
434     instance = objects.Instance.FromDict(instance)
435     return backend.FinalizeMigration(instance, info, success)
436
437   @staticmethod
438   def perspective_instance_migrate(params):
439     """Migrates an instance.
440
441     """
442     instance, target, live = params
443     instance = objects.Instance.FromDict(instance)
444     return backend.MigrateInstance(instance, target, live)
445
446   @staticmethod
447   def perspective_instance_reboot(params):
448     """Reboot an instance.
449
450     """
451     instance = objects.Instance.FromDict(params[0])
452     reboot_type = params[1]
453     return backend.InstanceReboot(instance, reboot_type)
454
455   @staticmethod
456   def perspective_instance_info(params):
457     """Query instance information.
458
459     """
460     return backend.GetInstanceInfo(params[0], params[1])
461
462   @staticmethod
463   def perspective_instance_migratable(params):
464     """Query whether the specified instance can be migrated.
465
466     """
467     instance = objects.Instance.FromDict(params[0])
468     return backend.GetInstanceMigratable(instance)
469
470   @staticmethod
471   def perspective_all_instances_info(params):
472     """Query information about all instances.
473
474     """
475     return backend.GetAllInstancesInfo(params[0])
476
477   @staticmethod
478   def perspective_instance_list(params):
479     """Query the list of running instances.
480
481     """
482     return backend.GetInstanceList(params[0])
483
484   # node --------------------------
485
486   @staticmethod
487   def perspective_node_tcp_ping(params):
488     """Do a TcpPing on the remote node.
489
490     """
491     return utils.TcpPing(params[1], params[2], timeout=params[3],
492                          live_port_needed=params[4], source=params[0])
493
494   @staticmethod
495   def perspective_node_has_ip_address(params):
496     """Checks if a node has the given ip address.
497
498     """
499     return utils.OwnIpAddress(params[0])
500
501   @staticmethod
502   def perspective_node_info(params):
503     """Query node information.
504
505     """
506     vgname, hypervisor_type = params
507     return backend.GetNodeInfo(vgname, hypervisor_type)
508
509   @staticmethod
510   def perspective_node_add(params):
511     """Complete the registration of this node in the cluster.
512
513     """
514     return backend.AddNode(params[0], params[1], params[2],
515                            params[3], params[4], params[5])
516
517   @staticmethod
518   def perspective_node_verify(params):
519     """Run a verify sequence on this node.
520
521     """
522     return backend.VerifyNode(params[0], params[1])
523
524   @staticmethod
525   def perspective_node_start_master(params):
526     """Promote this node to master status.
527
528     """
529     return backend.StartMaster(params[0])
530
531   @staticmethod
532   def perspective_node_stop_master(params):
533     """Demote this node from master status.
534
535     """
536     return backend.StopMaster(params[0])
537
538   @staticmethod
539   def perspective_node_leave_cluster(params):
540     """Cleanup after leaving a cluster.
541
542     """
543     return backend.LeaveCluster()
544
545   @staticmethod
546   def perspective_node_volumes(params):
547     """Query the list of all logical volume groups.
548
549     """
550     return backend.NodeVolumes()
551
552   @staticmethod
553   def perspective_node_demote_from_mc(params):
554     """Demote a node from the master candidate role.
555
556     """
557     return backend.DemoteFromMC()
558
559
560   @staticmethod
561   def perspective_node_powercycle(params):
562     """Tries to powercycle the nod.
563
564     """
565     hypervisor_type = params[0]
566     return backend.PowercycleNode(hypervisor_type)
567
568
569   # cluster --------------------------
570
571   @staticmethod
572   def perspective_version(params):
573     """Query version information.
574
575     """
576     return constants.PROTOCOL_VERSION
577
578   @staticmethod
579   def perspective_upload_file(params):
580     """Upload a file.
581
582     Note that the backend implementation imposes strict rules on which
583     files are accepted.
584
585     """
586     return backend.UploadFile(*params)
587
588   @staticmethod
589   def perspective_master_info(params):
590     """Query master information.
591
592     """
593     return backend.GetMasterInfo()
594
595   @staticmethod
596   def perspective_write_ssconf_files(params):
597     """Write ssconf files.
598
599     """
600     (values,) = params
601     return backend.WriteSsconfFiles(values)
602
603   # os -----------------------
604
605   @staticmethod
606   def perspective_os_diagnose(params):
607     """Query detailed information about existing OSes.
608
609     """
610     return [os_obj.ToDict() for os_obj in backend.DiagnoseOS()]
611
612   @staticmethod
613   def perspective_os_get(params):
614     """Query information about a given OS.
615
616     """
617     name = params[0]
618     try:
619       os_obj = backend.OSFromDisk(name)
620     except errors.InvalidOS, err:
621       os_obj = objects.OS.FromInvalidOS(err)
622     return os_obj.ToDict()
623
624   # hooks -----------------------
625
626   @staticmethod
627   def perspective_hooks_runner(params):
628     """Run hook scripts.
629
630     """
631     hpath, phase, env = params
632     hr = backend.HooksRunner()
633     return hr.RunHooks(hpath, phase, env)
634
635   # iallocator -----------------
636
637   @staticmethod
638   def perspective_iallocator_runner(params):
639     """Run an iallocator script.
640
641     """
642     name, idata = params
643     iar = backend.IAllocatorRunner()
644     return iar.Run(name, idata)
645
646   # test -----------------------
647
648   @staticmethod
649   def perspective_test_delay(params):
650     """Run test delay.
651
652     """
653     duration = params[0]
654     return utils.TestDelay(duration)
655
656   # file storage ---------------
657
658   @staticmethod
659   def perspective_file_storage_dir_create(params):
660     """Create the file storage directory.
661
662     """
663     file_storage_dir = params[0]
664     return backend.CreateFileStorageDir(file_storage_dir)
665
666   @staticmethod
667   def perspective_file_storage_dir_remove(params):
668     """Remove the file storage directory.
669
670     """
671     file_storage_dir = params[0]
672     return backend.RemoveFileStorageDir(file_storage_dir)
673
674   @staticmethod
675   def perspective_file_storage_dir_rename(params):
676     """Rename the file storage directory.
677
678     """
679     old_file_storage_dir = params[0]
680     new_file_storage_dir = params[1]
681     return backend.RenameFileStorageDir(old_file_storage_dir,
682                                         new_file_storage_dir)
683
684   # jobs ------------------------
685
686   @staticmethod
687   @_RequireJobQueueLock
688   def perspective_jobqueue_update(params):
689     """Update job queue.
690
691     """
692     (file_name, content) = params
693     return backend.JobQueueUpdate(file_name, content)
694
695   @staticmethod
696   @_RequireJobQueueLock
697   def perspective_jobqueue_purge(params):
698     """Purge job queue.
699
700     """
701     return backend.JobQueuePurge()
702
703   @staticmethod
704   @_RequireJobQueueLock
705   def perspective_jobqueue_rename(params):
706     """Rename a job queue file.
707
708     """
709     # TODO: What if a file fails to rename?
710     return [backend.JobQueueRename(old, new) for old, new in params]
711
712   @staticmethod
713   def perspective_jobqueue_set_drain(params):
714     """Set/unset the queue drain flag.
715
716     """
717     drain_flag = params[0]
718     return backend.JobQueueSetDrainFlag(drain_flag)
719
720
721   # hypervisor ---------------
722
723   @staticmethod
724   def perspective_hypervisor_validate_params(params):
725     """Validate the hypervisor parameters.
726
727     """
728     (hvname, hvparams) = params
729     return backend.ValidateHVParams(hvname, hvparams)
730
731
732 def ParseOptions():
733   """Parse the command line options.
734
735   @return: (options, args) as from OptionParser.parse_args()
736
737   """
738   parser = OptionParser(description="Ganeti node daemon",
739                         usage="%prog [-f] [-d] [-b ADDRESS]",
740                         version="%%prog (ganeti) %s" %
741                         constants.RELEASE_VERSION)
742
743   parser.add_option("-f", "--foreground", dest="fork",
744                     help="Don't detach from the current terminal",
745                     default=True, action="store_false")
746   parser.add_option("-d", "--debug", dest="debug",
747                     help="Enable some debug messages",
748                     default=False, action="store_true")
749   parser.add_option("-b", "--bind", dest="bind_address",
750                     help="Bind address",
751                     default="", metavar="ADDRESS")
752
753   options, args = parser.parse_args()
754   return options, args
755
756
757 def main():
758   """Main function for the node daemon.
759
760   """
761   global queue_lock
762
763   options, args = ParseOptions()
764   utils.debug = options.debug
765
766   if options.fork:
767     utils.CloseFDs()
768
769   for fname in (constants.SSL_CERT_FILE,):
770     if not os.path.isfile(fname):
771       print "config %s not there, will not run." % fname
772       sys.exit(5)
773
774   try:
775     port = utils.GetNodeDaemonPort()
776   except errors.ConfigurationError, err:
777     print "Cluster configuration incomplete: '%s'" % str(err)
778     sys.exit(5)
779
780   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
781   dirs.append((constants.LOG_OS_DIR, 0750))
782   dirs.append((constants.LOCK_DIR, 1777))
783   utils.EnsureDirs(dirs)
784
785   # become a daemon
786   if options.fork:
787     utils.Daemonize(logfile=constants.LOG_NODESERVER)
788
789   utils.WritePidFile(constants.NODED_PID)
790   try:
791     utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
792                        stderr_logging=not options.fork)
793     logging.info("ganeti node daemon startup")
794
795     # Read SSL certificate
796     ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
797                                     ssl_cert_path=constants.SSL_CERT_FILE)
798
799     # Prepare job queue
800     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
801
802     mainloop = daemon.Mainloop()
803     server = NodeHttpServer(mainloop, options.bind_address, port,
804                             ssl_params=ssl_params, ssl_verify_peer=True)
805     server.Start()
806     try:
807       mainloop.Run()
808     finally:
809       server.Stop()
810   finally:
811     utils.RemovePidFile(constants.NODED_PID)
812
813
814 if __name__ == '__main__':
815   main()