Fix burnin problems when using http checks
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import SocketServer
31 import errno
32 import logging
33 import signal
34
35 from optparse import OptionParser
36
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
45
46 import ganeti.http.server
47
48
49 queue_lock = None
50
51
52 def _RequireJobQueueLock(fn):
53   """Decorator for job queue manipulating functions.
54
55   """
56   QUEUE_LOCK_TIMEOUT = 10
57
58   def wrapper(*args, **kwargs):
59     # Locking in exclusive, blocking mode because there could be several
60     # children running at the same time. Waiting up to 10 seconds.
61     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
62     try:
63       return fn(*args, **kwargs)
64     finally:
65       queue_lock.Unlock()
66
67   return wrapper
68
69
70 class NodeHttpServer(http.server.HttpServer):
71   """The server implementation.
72
73   This class holds all methods exposed over the RPC interface.
74
75   """
76   def __init__(self, *args, **kwargs):
77     http.server.HttpServer.__init__(self, *args, **kwargs)
78     self.noded_pid = os.getpid()
79
80   def HandleRequest(self, req):
81     """Handle a request.
82
83     """
84     if req.request_method.upper() != http.HTTP_PUT:
85       raise http.HttpBadRequest()
86
87     path = req.request_path
88     if path.startswith("/"):
89       path = path[1:]
90
91     method = getattr(self, "perspective_%s" % path, None)
92     if method is None:
93       raise http.HttpNotFound()
94
95     try:
96       try:
97         return method(req.request_body)
98       except:
99         logging.exception("Error in RPC call")
100         raise
101     except errors.QuitGanetiException, err:
102       # Tell parent to quit
103       os.kill(self.noded_pid, signal.SIGTERM)
104
105   # the new block devices  --------------------------
106
107   @staticmethod
108   def perspective_blockdev_create(params):
109     """Create a block device.
110
111     """
112     bdev_s, size, owner, on_primary, info = params
113     bdev = objects.Disk.FromDict(bdev_s)
114     if bdev is None:
115       raise ValueError("can't unserialize data!")
116     return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
117
118   @staticmethod
119   def perspective_blockdev_remove(params):
120     """Remove a block device.
121
122     """
123     bdev_s = params[0]
124     bdev = objects.Disk.FromDict(bdev_s)
125     return backend.RemoveBlockDevice(bdev)
126
127   @staticmethod
128   def perspective_blockdev_rename(params):
129     """Remove a block device.
130
131     """
132     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
133     return backend.RenameBlockDevices(devlist)
134
135   @staticmethod
136   def perspective_blockdev_assemble(params):
137     """Assemble a block device.
138
139     """
140     bdev_s, owner, on_primary = params
141     bdev = objects.Disk.FromDict(bdev_s)
142     if bdev is None:
143       raise ValueError("can't unserialize data!")
144     return backend.AssembleBlockDevice(bdev, owner, on_primary)
145
146   @staticmethod
147   def perspective_blockdev_shutdown(params):
148     """Shutdown a block device.
149
150     """
151     bdev_s = params[0]
152     bdev = objects.Disk.FromDict(bdev_s)
153     if bdev is None:
154       raise ValueError("can't unserialize data!")
155     return backend.ShutdownBlockDevice(bdev)
156
157   @staticmethod
158   def perspective_blockdev_addchildren(params):
159     """Add a child to a mirror device.
160
161     Note: this is only valid for mirror devices. It's the caller's duty
162     to send a correct disk, otherwise we raise an error.
163
164     """
165     bdev_s, ndev_s = params
166     bdev = objects.Disk.FromDict(bdev_s)
167     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
168     if bdev is None or ndevs.count(None) > 0:
169       raise ValueError("can't unserialize data!")
170     return backend.MirrorAddChildren(bdev, ndevs)
171
172   @staticmethod
173   def perspective_blockdev_removechildren(params):
174     """Remove a child from a mirror device.
175
176     This is only valid for mirror devices, of course. It's the callers
177     duty to send a correct disk, otherwise we raise an error.
178
179     """
180     bdev_s, ndev_s = params
181     bdev = objects.Disk.FromDict(bdev_s)
182     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
183     if bdev is None or ndevs.count(None) > 0:
184       raise ValueError("can't unserialize data!")
185     return backend.MirrorRemoveChildren(bdev, ndevs)
186
187   @staticmethod
188   def perspective_blockdev_getmirrorstatus(params):
189     """Return the mirror status for a list of disks.
190
191     """
192     disks = [objects.Disk.FromDict(dsk_s)
193             for dsk_s in params]
194     return backend.GetMirrorStatus(disks)
195
196   @staticmethod
197   def perspective_blockdev_find(params):
198     """Expose the FindBlockDevice functionality for a disk.
199
200     This will try to find but not activate a disk.
201
202     """
203     disk = objects.Disk.FromDict(params[0])
204     return backend.FindBlockDevice(disk)
205
206   @staticmethod
207   def perspective_blockdev_snapshot(params):
208     """Create a snapshot device.
209
210     Note that this is only valid for LVM disks, if we get passed
211     something else we raise an exception. The snapshot device can be
212     remove by calling the generic block device remove call.
213
214     """
215     cfbd = objects.Disk.FromDict(params[0])
216     return backend.SnapshotBlockDevice(cfbd)
217
218   @staticmethod
219   def perspective_blockdev_grow(params):
220     """Grow a stack of devices.
221
222     """
223     cfbd = objects.Disk.FromDict(params[0])
224     amount = params[1]
225     return backend.GrowBlockDevice(cfbd, amount)
226
227   @staticmethod
228   def perspective_blockdev_close(params):
229     """Closes the given block devices.
230
231     """
232     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
233     return backend.CloseBlockDevices(params[0], disks)
234
235   # blockdev/drbd specific methods ----------
236
237   @staticmethod
238   def perspective_drbd_disconnect_net(params):
239     """Disconnects the network connection of drbd disks.
240
241     Note that this is only valid for drbd disks, so the members of the
242     disk list must all be drbd devices.
243
244     """
245     nodes_ip, disks = params
246     disks = [objects.Disk.FromDict(cf) for cf in disks]
247     return backend.DrbdDisconnectNet(nodes_ip, disks)
248
249   @staticmethod
250   def perspective_drbd_attach_net(params):
251     """Attaches the network connection of drbd disks.
252
253     Note that this is only valid for drbd disks, so the members of the
254     disk list must all be drbd devices.
255
256     """
257     nodes_ip, disks, instance_name, multimaster = params
258     disks = [objects.Disk.FromDict(cf) for cf in disks]
259     return backend.DrbdAttachNet(nodes_ip, disks, instance_name, multimaster)
260
261   @staticmethod
262   def perspective_drbd_wait_sync(params):
263     """Wait until DRBD disks are synched.
264
265     Note that this is only valid for drbd disks, so the members of the
266     disk list must all be drbd devices.
267
268     """
269     nodes_ip, disks = params
270     disks = [objects.Disk.FromDict(cf) for cf in disks]
271     return backend.DrbdWaitSync(nodes_ip, disks)
272
273   # export/import  --------------------------
274
275   @staticmethod
276   def perspective_snapshot_export(params):
277     """Export a given snapshot.
278
279     """
280     disk = objects.Disk.FromDict(params[0])
281     dest_node = params[1]
282     instance = objects.Instance.FromDict(params[2])
283     cluster_name = params[3]
284     dev_idx = params[4]
285     return backend.ExportSnapshot(disk, dest_node, instance,
286                                   cluster_name, dev_idx)
287
288   @staticmethod
289   def perspective_finalize_export(params):
290     """Expose the finalize export functionality.
291
292     """
293     instance = objects.Instance.FromDict(params[0])
294     snap_disks = [objects.Disk.FromDict(str_data)
295                   for str_data in params[1]]
296     return backend.FinalizeExport(instance, snap_disks)
297
298   @staticmethod
299   def perspective_export_info(params):
300     """Query information about an existing export on this node.
301
302     The given path may not contain an export, in which case we return
303     None.
304
305     """
306     path = params[0]
307     einfo = backend.ExportInfo(path)
308     if einfo is None:
309       return einfo
310     return einfo.Dumps()
311
312   @staticmethod
313   def perspective_export_list(params):
314     """List the available exports on this node.
315
316     Note that as opposed to export_info, which may query data about an
317     export in any path, this only queries the standard Ganeti path
318     (constants.EXPORT_DIR).
319
320     """
321     return backend.ListExports()
322
323   @staticmethod
324   def perspective_export_remove(params):
325     """Remove an export.
326
327     """
328     export = params[0]
329     return backend.RemoveExport(export)
330
331   # volume  --------------------------
332
333   @staticmethod
334   def perspective_volume_list(params):
335     """Query the list of logical volumes in a given volume group.
336
337     """
338     vgname = params[0]
339     return backend.GetVolumeList(vgname)
340
341   @staticmethod
342   def perspective_vg_list(params):
343     """Query the list of volume groups.
344
345     """
346     return backend.ListVolumeGroups()
347
348   # bridge  --------------------------
349
350   @staticmethod
351   def perspective_bridges_exist(params):
352     """Check if all bridges given exist on this node.
353
354     """
355     bridges_list = params[0]
356     return backend.BridgesExist(bridges_list)
357
358   # instance  --------------------------
359
360   @staticmethod
361   def perspective_instance_os_add(params):
362     """Install an OS on a given instance.
363
364     """
365     inst_s = params[0]
366     inst = objects.Instance.FromDict(inst_s)
367     return backend.AddOSToInstance(inst)
368
369   @staticmethod
370   def perspective_instance_run_rename(params):
371     """Runs the OS rename script for an instance.
372
373     """
374     inst_s, old_name = params
375     inst = objects.Instance.FromDict(inst_s)
376     return backend.RunRenameInstance(inst, old_name)
377
378   @staticmethod
379   def perspective_instance_os_import(params):
380     """Run the import function of an OS onto a given instance.
381
382     """
383     inst_s, src_node, src_images, cluster_name = params
384     inst = objects.Instance.FromDict(inst_s)
385     return backend.ImportOSIntoInstance(inst, src_node, src_images,
386                                         cluster_name)
387
388   @staticmethod
389   def perspective_instance_shutdown(params):
390     """Shutdown an instance.
391
392     """
393     instance = objects.Instance.FromDict(params[0])
394     return backend.ShutdownInstance(instance)
395
396   @staticmethod
397   def perspective_instance_start(params):
398     """Start an instance.
399
400     """
401     instance = objects.Instance.FromDict(params[0])
402     extra_args = params[1]
403     return backend.StartInstance(instance, extra_args)
404
405   @staticmethod
406   def perspective_instance_migrate(params):
407     """Migrates an instance.
408
409     """
410     instance, target, live = params
411     instance = objects.Instance.FromDict(instance)
412     return backend.MigrateInstance(instance, target, live)
413
414   @staticmethod
415   def perspective_instance_reboot(params):
416     """Reboot an instance.
417
418     """
419     instance = objects.Instance.FromDict(params[0])
420     reboot_type = params[1]
421     extra_args = params[2]
422     return backend.RebootInstance(instance, reboot_type, extra_args)
423
424   @staticmethod
425   def perspective_instance_info(params):
426     """Query instance information.
427
428     """
429     return backend.GetInstanceInfo(params[0], params[1])
430
431   @staticmethod
432   def perspective_instance_migratable(params):
433     """Query whether the specified instance can be migrated.
434
435     """
436     instance = objects.Instance.FromDict(params[0])
437     return backend.GetInstanceMigratable(instance)
438
439   @staticmethod
440   def perspective_all_instances_info(params):
441     """Query information about all instances.
442
443     """
444     return backend.GetAllInstancesInfo(params[0])
445
446   @staticmethod
447   def perspective_instance_list(params):
448     """Query the list of running instances.
449
450     """
451     return backend.GetInstanceList(params[0])
452
453   # node --------------------------
454
455   @staticmethod
456   def perspective_node_tcp_ping(params):
457     """Do a TcpPing on the remote node.
458
459     """
460     return utils.TcpPing(params[1], params[2], timeout=params[3],
461                          live_port_needed=params[4], source=params[0])
462
463   @staticmethod
464   def perspective_node_has_ip_address(params):
465     """Checks if a node has the given ip address.
466
467     """
468     return utils.OwnIpAddress(params[0])
469
470   @staticmethod
471   def perspective_node_info(params):
472     """Query node information.
473
474     """
475     vgname, hypervisor_type = params
476     return backend.GetNodeInfo(vgname, hypervisor_type)
477
478   @staticmethod
479   def perspective_node_add(params):
480     """Complete the registration of this node in the cluster.
481
482     """
483     return backend.AddNode(params[0], params[1], params[2],
484                            params[3], params[4], params[5])
485
486   @staticmethod
487   def perspective_node_verify(params):
488     """Run a verify sequence on this node.
489
490     """
491     return backend.VerifyNode(params[0], params[1])
492
493   @staticmethod
494   def perspective_node_start_master(params):
495     """Promote this node to master status.
496
497     """
498     return backend.StartMaster(params[0])
499
500   @staticmethod
501   def perspective_node_stop_master(params):
502     """Demote this node from master status.
503
504     """
505     return backend.StopMaster(params[0])
506
507   @staticmethod
508   def perspective_node_leave_cluster(params):
509     """Cleanup after leaving a cluster.
510
511     """
512     return backend.LeaveCluster()
513
514   @staticmethod
515   def perspective_node_volumes(params):
516     """Query the list of all logical volume groups.
517
518     """
519     return backend.NodeVolumes()
520
521   @staticmethod
522   def perspective_node_demote_from_mc(params):
523     """Demote a node from the master candidate role.
524
525     """
526     return backend.DemoteFromMC()
527
528
529   # cluster --------------------------
530
531   @staticmethod
532   def perspective_version(params):
533     """Query version information.
534
535     """
536     return constants.PROTOCOL_VERSION
537
538   @staticmethod
539   def perspective_upload_file(params):
540     """Upload a file.
541
542     Note that the backend implementation imposes strict rules on which
543     files are accepted.
544
545     """
546     return backend.UploadFile(*params)
547
548   @staticmethod
549   def perspective_master_info(params):
550     """Query master information.
551
552     """
553     return backend.GetMasterInfo()
554
555   @staticmethod
556   def perspective_write_ssconf_files(params):
557     """Write ssconf files.
558
559     """
560     (values,) = params
561     return backend.WriteSsconfFiles(values)
562
563   # os -----------------------
564
565   @staticmethod
566   def perspective_os_diagnose(params):
567     """Query detailed information about existing OSes.
568
569     """
570     return [os.ToDict() for os in backend.DiagnoseOS()]
571
572   @staticmethod
573   def perspective_os_get(params):
574     """Query information about a given OS.
575
576     """
577     name = params[0]
578     try:
579       os_obj = backend.OSFromDisk(name)
580     except errors.InvalidOS, err:
581       os_obj = objects.OS.FromInvalidOS(err)
582     return os_obj.ToDict()
583
584   # hooks -----------------------
585
586   @staticmethod
587   def perspective_hooks_runner(params):
588     """Run hook scripts.
589
590     """
591     hpath, phase, env = params
592     hr = backend.HooksRunner()
593     return hr.RunHooks(hpath, phase, env)
594
595   # iallocator -----------------
596
597   @staticmethod
598   def perspective_iallocator_runner(params):
599     """Run an iallocator script.
600
601     """
602     name, idata = params
603     iar = backend.IAllocatorRunner()
604     return iar.Run(name, idata)
605
606   # test -----------------------
607
608   @staticmethod
609   def perspective_test_delay(params):
610     """Run test delay.
611
612     """
613     duration = params[0]
614     return utils.TestDelay(duration)
615
616   # file storage ---------------
617
618   @staticmethod
619   def perspective_file_storage_dir_create(params):
620     """Create the file storage directory.
621
622     """
623     file_storage_dir = params[0]
624     return backend.CreateFileStorageDir(file_storage_dir)
625
626   @staticmethod
627   def perspective_file_storage_dir_remove(params):
628     """Remove the file storage directory.
629
630     """
631     file_storage_dir = params[0]
632     return backend.RemoveFileStorageDir(file_storage_dir)
633
634   @staticmethod
635   def perspective_file_storage_dir_rename(params):
636     """Rename the file storage directory.
637
638     """
639     old_file_storage_dir = params[0]
640     new_file_storage_dir = params[1]
641     return backend.RenameFileStorageDir(old_file_storage_dir,
642                                         new_file_storage_dir)
643
644   # jobs ------------------------
645
646   @staticmethod
647   @_RequireJobQueueLock
648   def perspective_jobqueue_update(params):
649     """Update job queue.
650
651     """
652     (file_name, content) = params
653     return backend.JobQueueUpdate(file_name, content)
654
655   @staticmethod
656   @_RequireJobQueueLock
657   def perspective_jobqueue_purge(params):
658     """Purge job queue.
659
660     """
661     return backend.JobQueuePurge()
662
663   @staticmethod
664   @_RequireJobQueueLock
665   def perspective_jobqueue_rename(params):
666     """Rename a job queue file.
667
668     """
669     # TODO: What if a file fails to rename?
670     return [backend.JobQueueRename(old, new) for old, new in params]
671
672   @staticmethod
673   def perspective_jobqueue_set_drain(params):
674     """Set/unset the queue drain flag.
675
676     """
677     drain_flag = params[0]
678     return backend.JobQueueSetDrainFlag(drain_flag)
679
680
681   # hypervisor ---------------
682
683   @staticmethod
684   def perspective_hypervisor_validate_params(params):
685     """Validate the hypervisor parameters.
686
687     """
688     (hvname, hvparams) = params
689     return backend.ValidateHVParams(hvname, hvparams)
690
691
692 def ParseOptions():
693   """Parse the command line options.
694
695   @return: (options, args) as from OptionParser.parse_args()
696
697   """
698   parser = OptionParser(description="Ganeti node daemon",
699                         usage="%prog [-f] [-d]",
700                         version="%%prog (ganeti) %s" %
701                         constants.RELEASE_VERSION)
702
703   parser.add_option("-f", "--foreground", dest="fork",
704                     help="Don't detach from the current terminal",
705                     default=True, action="store_false")
706   parser.add_option("-d", "--debug", dest="debug",
707                     help="Enable some debug messages",
708                     default=False, action="store_true")
709   options, args = parser.parse_args()
710   return options, args
711
712
713 def EnsureRuntimeEnvironment():
714   """Ensure our run-time environment is complete.
715
716   Currently this creates directories which could be missing, either
717   due to directories being on a tmpfs mount, or due to incomplete
718   packaging.
719
720   """
721   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
722   dirs.append((constants.LOG_OS_DIR, 0750))
723   for dir_name, dir_mode in dirs:
724     if not os.path.exists(dir_name):
725       try:
726         os.mkdir(dir_name, dir_mode)
727       except EnvironmentError, err:
728         if err.errno != errno.EEXIST:
729           print ("Node setup wrong, cannot create directory '%s': %s" %
730                  (dir_name, err))
731           sys.exit(5)
732     if not os.path.isdir(dir_name):
733       print ("Node setup wrong, '%s' is not a directory" % dir_name)
734       sys.exit(5)
735
736
737 def main():
738   """Main function for the node daemon.
739
740   """
741   global queue_lock
742
743   options, args = ParseOptions()
744   utils.debug = options.debug
745
746   if options.fork:
747     utils.CloseFDs()
748
749   for fname in (constants.SSL_CERT_FILE,):
750     if not os.path.isfile(fname):
751       print "config %s not there, will not run." % fname
752       sys.exit(5)
753
754   try:
755     port = utils.GetNodeDaemonPort()
756   except errors.ConfigurationError, err:
757     print "Cluster configuration incomplete: '%s'" % str(err)
758     sys.exit(5)
759
760   EnsureRuntimeEnvironment()
761
762   # become a daemon
763   if options.fork:
764     utils.Daemonize(logfile=constants.LOG_NODESERVER)
765
766   utils.WritePidFile(constants.NODED_PID)
767   try:
768     utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
769                        stderr_logging=not options.fork)
770     logging.info("ganeti node daemon startup")
771
772     # Read SSL certificate
773     ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
774                                     ssl_cert_path=constants.SSL_CERT_FILE)
775
776     # Prepare job queue
777     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
778
779     mainloop = daemon.Mainloop()
780     server = NodeHttpServer(mainloop, "", port,
781                             ssl_params=ssl_params, ssl_verify_peer=True)
782     server.Start()
783     try:
784       mainloop.Run()
785     finally:
786       server.Stop()
787   finally:
788     utils.RemovePidFile(constants.NODED_PID)
789
790
791 if __name__ == '__main__':
792   main()