Pass ssconf values from master to node
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import SocketServer
31 import errno
32 import logging
33 import signal
34
35 from optparse import OptionParser
36
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
45
46
47 queue_lock = None
48
49
50 def _RequireJobQueueLock(fn):
51   """Decorator for job queue manipulating functions.
52
53   """
54   QUEUE_LOCK_TIMEOUT = 10
55
56   def wrapper(*args, **kwargs):
57     # Locking in exclusive, blocking mode because there could be several
58     # children running at the same time. Waiting up to 10 seconds.
59     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
60     try:
61       return fn(*args, **kwargs)
62     finally:
63       queue_lock.Unlock()
64
65   return wrapper
66
67
68 class NodeHttpServer(http.HttpServer):
69   """The server implementation.
70
71   This class holds all methods exposed over the RPC interface.
72
73   """
74   def __init__(self, *args, **kwargs):
75     http.HttpServer.__init__(self, *args, **kwargs)
76     self.noded_pid = os.getpid()
77
78   def HandleRequest(self, req):
79     """Handle a request.
80
81     """
82     if req.request_method.upper() != "PUT":
83       raise http.HTTPBadRequest()
84
85     path = req.request_path
86     if path.startswith("/"):
87       path = path[1:]
88
89     method = getattr(self, "perspective_%s" % path, None)
90     if method is None:
91       raise http.HTTPNotFound()
92
93     try:
94       try:
95         return method(req.request_post_data)
96       except:
97         logging.exception("Error in RPC call")
98         raise
99     except errors.QuitGanetiException, err:
100       # Tell parent to quit
101       os.kill(self.noded_pid, signal.SIGTERM)
102
103   # the new block devices  --------------------------
104
105   @staticmethod
106   def perspective_blockdev_create(params):
107     """Create a block device.
108
109     """
110     bdev_s, size, owner, on_primary, info = params
111     bdev = objects.Disk.FromDict(bdev_s)
112     if bdev is None:
113       raise ValueError("can't unserialize data!")
114     return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
115
116   @staticmethod
117   def perspective_blockdev_remove(params):
118     """Remove a block device.
119
120     """
121     bdev_s = params[0]
122     bdev = objects.Disk.FromDict(bdev_s)
123     return backend.RemoveBlockDevice(bdev)
124
125   @staticmethod
126   def perspective_blockdev_rename(params):
127     """Remove a block device.
128
129     """
130     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
131     return backend.RenameBlockDevices(devlist)
132
133   @staticmethod
134   def perspective_blockdev_assemble(params):
135     """Assemble a block device.
136
137     """
138     bdev_s, owner, on_primary = params
139     bdev = objects.Disk.FromDict(bdev_s)
140     if bdev is None:
141       raise ValueError("can't unserialize data!")
142     return backend.AssembleBlockDevice(bdev, owner, on_primary)
143
144   @staticmethod
145   def perspective_blockdev_shutdown(params):
146     """Shutdown a block device.
147
148     """
149     bdev_s = params[0]
150     bdev = objects.Disk.FromDict(bdev_s)
151     if bdev is None:
152       raise ValueError("can't unserialize data!")
153     return backend.ShutdownBlockDevice(bdev)
154
155   @staticmethod
156   def perspective_blockdev_addchildren(params):
157     """Add a child to a mirror device.
158
159     Note: this is only valid for mirror devices. It's the caller's duty
160     to send a correct disk, otherwise we raise an error.
161
162     """
163     bdev_s, ndev_s = params
164     bdev = objects.Disk.FromDict(bdev_s)
165     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
166     if bdev is None or ndevs.count(None) > 0:
167       raise ValueError("can't unserialize data!")
168     return backend.MirrorAddChildren(bdev, ndevs)
169
170   @staticmethod
171   def perspective_blockdev_removechildren(params):
172     """Remove a child from a mirror device.
173
174     This is only valid for mirror devices, of course. It's the callers
175     duty to send a correct disk, otherwise we raise an error.
176
177     """
178     bdev_s, ndev_s = params
179     bdev = objects.Disk.FromDict(bdev_s)
180     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
181     if bdev is None or ndevs.count(None) > 0:
182       raise ValueError("can't unserialize data!")
183     return backend.MirrorRemoveChildren(bdev, ndevs)
184
185   @staticmethod
186   def perspective_blockdev_getmirrorstatus(params):
187     """Return the mirror status for a list of disks.
188
189     """
190     disks = [objects.Disk.FromDict(dsk_s)
191             for dsk_s in params]
192     return backend.GetMirrorStatus(disks)
193
194   @staticmethod
195   def perspective_blockdev_find(params):
196     """Expose the FindBlockDevice functionality for a disk.
197
198     This will try to find but not activate a disk.
199
200     """
201     disk = objects.Disk.FromDict(params[0])
202     return backend.FindBlockDevice(disk)
203
204   @staticmethod
205   def perspective_blockdev_snapshot(params):
206     """Create a snapshot device.
207
208     Note that this is only valid for LVM disks, if we get passed
209     something else we raise an exception. The snapshot device can be
210     remove by calling the generic block device remove call.
211
212     """
213     cfbd = objects.Disk.FromDict(params[0])
214     return backend.SnapshotBlockDevice(cfbd)
215
216   @staticmethod
217   def perspective_blockdev_grow(params):
218     """Grow a stack of devices.
219
220     """
221     cfbd = objects.Disk.FromDict(params[0])
222     amount = params[1]
223     return backend.GrowBlockDevice(cfbd, amount)
224
225   @staticmethod
226   def perspective_blockdev_close(params):
227     """Closes the given block devices.
228
229     """
230     disks = [objects.Disk.FromDict(cf) for cf in params]
231     return backend.CloseBlockDevices(disks)
232
233   # export/import  --------------------------
234
235   @staticmethod
236   def perspective_snapshot_export(params):
237     """Export a given snapshot.
238
239     """
240     disk = objects.Disk.FromDict(params[0])
241     dest_node = params[1]
242     instance = objects.Instance.FromDict(params[2])
243     cluster_name = params[3]
244     dev_idx = params[4]
245     return backend.ExportSnapshot(disk, dest_node, instance,
246                                   cluster_name, dev_idx)
247
248   @staticmethod
249   def perspective_finalize_export(params):
250     """Expose the finalize export functionality.
251
252     """
253     instance = objects.Instance.FromDict(params[0])
254     snap_disks = [objects.Disk.FromDict(str_data)
255                   for str_data in params[1]]
256     return backend.FinalizeExport(instance, snap_disks)
257
258   @staticmethod
259   def perspective_export_info(params):
260     """Query information about an existing export on this node.
261
262     The given path may not contain an export, in which case we return
263     None.
264
265     """
266     path = params[0]
267     einfo = backend.ExportInfo(path)
268     if einfo is None:
269       return einfo
270     return einfo.Dumps()
271
272   @staticmethod
273   def perspective_export_list(params):
274     """List the available exports on this node.
275
276     Note that as opposed to export_info, which may query data about an
277     export in any path, this only queries the standard Ganeti path
278     (constants.EXPORT_DIR).
279
280     """
281     return backend.ListExports()
282
283   @staticmethod
284   def perspective_export_remove(params):
285     """Remove an export.
286
287     """
288     export = params[0]
289     return backend.RemoveExport(export)
290
291   # volume  --------------------------
292
293   @staticmethod
294   def perspective_volume_list(params):
295     """Query the list of logical volumes in a given volume group.
296
297     """
298     vgname = params[0]
299     return backend.GetVolumeList(vgname)
300
301   @staticmethod
302   def perspective_vg_list(params):
303     """Query the list of volume groups.
304
305     """
306     return backend.ListVolumeGroups()
307
308   # bridge  --------------------------
309
310   @staticmethod
311   def perspective_bridges_exist(params):
312     """Check if all bridges given exist on this node.
313
314     """
315     bridges_list = params[0]
316     return backend.BridgesExist(bridges_list)
317
318   # instance  --------------------------
319
320   @staticmethod
321   def perspective_instance_os_add(params):
322     """Install an OS on a given instance.
323
324     """
325     inst_s = params[0]
326     inst = objects.Instance.FromDict(inst_s)
327     return backend.AddOSToInstance(inst)
328
329   @staticmethod
330   def perspective_instance_run_rename(params):
331     """Runs the OS rename script for an instance.
332
333     """
334     inst_s, old_name = params
335     inst = objects.Instance.FromDict(inst_s)
336     return backend.RunRenameInstance(inst, old_name)
337
338   @staticmethod
339   def perspective_instance_os_import(params):
340     """Run the import function of an OS onto a given instance.
341
342     """
343     inst_s, src_node, src_images, cluster_name = params
344     inst = objects.Instance.FromDict(inst_s)
345     return backend.ImportOSIntoInstance(inst, src_node, src_images,
346                                         cluster_name)
347
348   @staticmethod
349   def perspective_instance_shutdown(params):
350     """Shutdown an instance.
351
352     """
353     instance = objects.Instance.FromDict(params[0])
354     return backend.ShutdownInstance(instance)
355
356   @staticmethod
357   def perspective_instance_start(params):
358     """Start an instance.
359
360     """
361     instance = objects.Instance.FromDict(params[0])
362     extra_args = params[1]
363     return backend.StartInstance(instance, extra_args)
364
365   @staticmethod
366   def perspective_instance_migrate(params):
367     """Migrates an instance.
368
369     """
370     instance, target, live = params
371     instance = objects.Instance.FromDict(instance)
372     return backend.MigrateInstance(instance, target, live)
373
374   @staticmethod
375   def perspective_instance_reboot(params):
376     """Reboot an instance.
377
378     """
379     instance = objects.Instance.FromDict(params[0])
380     reboot_type = params[1]
381     extra_args = params[2]
382     return backend.RebootInstance(instance, reboot_type, extra_args)
383
384   @staticmethod
385   def perspective_instance_info(params):
386     """Query instance information.
387
388     """
389     return backend.GetInstanceInfo(params[0], params[1])
390
391   @staticmethod
392   def perspective_all_instances_info(params):
393     """Query information about all instances.
394
395     """
396     return backend.GetAllInstancesInfo(params[0])
397
398   @staticmethod
399   def perspective_instance_list(params):
400     """Query the list of running instances.
401
402     """
403     return backend.GetInstanceList(params[0])
404
405   # node --------------------------
406
407   @staticmethod
408   def perspective_node_tcp_ping(params):
409     """Do a TcpPing on the remote node.
410
411     """
412     return utils.TcpPing(params[1], params[2], timeout=params[3],
413                          live_port_needed=params[4], source=params[0])
414
415   @staticmethod
416   def perspective_node_has_ip_address(params):
417     """Checks if a node has the given ip address.
418
419     """
420     return utils.OwnIpAddress(params[0])
421
422   @staticmethod
423   def perspective_node_info(params):
424     """Query node information.
425
426     """
427     vgname, hypervisor_type = params
428     return backend.GetNodeInfo(vgname, hypervisor_type)
429
430   @staticmethod
431   def perspective_node_add(params):
432     """Complete the registration of this node in the cluster.
433
434     """
435     return backend.AddNode(params[0], params[1], params[2],
436                            params[3], params[4], params[5])
437
438   @staticmethod
439   def perspective_node_verify(params):
440     """Run a verify sequence on this node.
441
442     """
443     return backend.VerifyNode(params[0], params[1])
444
445   @staticmethod
446   def perspective_node_start_master(params):
447     """Promote this node to master status.
448
449     """
450     return backend.StartMaster(params[0])
451
452   @staticmethod
453   def perspective_node_stop_master(params):
454     """Demote this node from master status.
455
456     """
457     return backend.StopMaster(params[0])
458
459   @staticmethod
460   def perspective_node_leave_cluster(params):
461     """Cleanup after leaving a cluster.
462
463     """
464     return backend.LeaveCluster()
465
466   @staticmethod
467   def perspective_node_volumes(params):
468     """Query the list of all logical volume groups.
469
470     """
471     return backend.NodeVolumes()
472
473   # cluster --------------------------
474
475   @staticmethod
476   def perspective_version(params):
477     """Query version information.
478
479     """
480     return constants.PROTOCOL_VERSION
481
482   @staticmethod
483   def perspective_upload_file(params):
484     """Upload a file.
485
486     Note that the backend implementation imposes strict rules on which
487     files are accepted.
488
489     """
490     return backend.UploadFile(*params)
491
492   @staticmethod
493   def perspective_master_info(params):
494     """Query master information.
495
496     """
497     return backend.GetMasterInfo()
498
499   @staticmethod
500   def perspective_write_ssconf_files(params):
501     """Write ssconf files.
502
503     """
504     (values,) = params
505     return backend.WriteSsconfFiles(values)
506
507   # os -----------------------
508
509   @staticmethod
510   def perspective_os_diagnose(params):
511     """Query detailed information about existing OSes.
512
513     """
514     return [os.ToDict() for os in backend.DiagnoseOS()]
515
516   @staticmethod
517   def perspective_os_get(params):
518     """Query information about a given OS.
519
520     """
521     name = params[0]
522     try:
523       os_obj = backend.OSFromDisk(name)
524     except errors.InvalidOS, err:
525       os_obj = objects.OS.FromInvalidOS(err)
526     return os_obj.ToDict()
527
528   # hooks -----------------------
529
530   @staticmethod
531   def perspective_hooks_runner(params):
532     """Run hook scripts.
533
534     """
535     hpath, phase, env = params
536     hr = backend.HooksRunner()
537     return hr.RunHooks(hpath, phase, env)
538
539   # iallocator -----------------
540
541   @staticmethod
542   def perspective_iallocator_runner(params):
543     """Run an iallocator script.
544
545     """
546     name, idata = params
547     iar = backend.IAllocatorRunner()
548     return iar.Run(name, idata)
549
550   # test -----------------------
551
552   @staticmethod
553   def perspective_test_delay(params):
554     """Run test delay.
555
556     """
557     duration = params[0]
558     return utils.TestDelay(duration)
559
560   # file storage ---------------
561
562   @staticmethod
563   def perspective_file_storage_dir_create(params):
564     """Create the file storage directory.
565
566     """
567     file_storage_dir = params[0]
568     return backend.CreateFileStorageDir(file_storage_dir)
569
570   @staticmethod
571   def perspective_file_storage_dir_remove(params):
572     """Remove the file storage directory.
573
574     """
575     file_storage_dir = params[0]
576     return backend.RemoveFileStorageDir(file_storage_dir)
577
578   @staticmethod
579   def perspective_file_storage_dir_rename(params):
580     """Rename the file storage directory.
581
582     """
583     old_file_storage_dir = params[0]
584     new_file_storage_dir = params[1]
585     return backend.RenameFileStorageDir(old_file_storage_dir,
586                                         new_file_storage_dir)
587
588   # jobs ------------------------
589
590   @staticmethod
591   @_RequireJobQueueLock
592   def perspective_jobqueue_update(params):
593     """Update job queue.
594
595     """
596     (file_name, content) = params
597     return backend.JobQueueUpdate(file_name, content)
598
599   @staticmethod
600   @_RequireJobQueueLock
601   def perspective_jobqueue_purge(params):
602     """Purge job queue.
603
604     """
605     return backend.JobQueuePurge()
606
607   @staticmethod
608   @_RequireJobQueueLock
609   def perspective_jobqueue_rename(params):
610     """Rename a job queue file.
611
612     """
613     (old, new) = params
614
615     return backend.JobQueueRename(old, new)
616
617   @staticmethod
618   def perspective_jobqueue_set_drain(params):
619     """Set/unset the queue drain flag.
620
621     """
622     drain_flag = params[0]
623     return backend.JobQueueSetDrainFlag(drain_flag)
624
625
626   # hypervisor ---------------
627
628   @staticmethod
629   def perspective_hypervisor_validate_params(params):
630     """Validate the hypervisor parameters.
631
632     """
633     (hvname, hvparams) = params
634     return backend.ValidateHVParams(hvname, hvparams)
635
636
637 def ParseOptions():
638   """Parse the command line options.
639
640   Returns:
641     (options, args) as from OptionParser.parse_args()
642
643   """
644   parser = OptionParser(description="Ganeti node daemon",
645                         usage="%prog [-f] [-d]",
646                         version="%%prog (ganeti) %s" %
647                         constants.RELEASE_VERSION)
648
649   parser.add_option("-f", "--foreground", dest="fork",
650                     help="Don't detach from the current terminal",
651                     default=True, action="store_false")
652   parser.add_option("-d", "--debug", dest="debug",
653                     help="Enable some debug messages",
654                     default=False, action="store_true")
655   options, args = parser.parse_args()
656   return options, args
657
658
659 def EnsureRuntimeEnvironment():
660   """Ensure our run-time environment is complete.
661
662   Currently this creates directories which could be missing, either
663   due to directories being on a tmpfs mount, or due to incomplete
664   packaging.
665
666   """
667   dirs = [(val, 0755) for val in constants.SUB_RUN_DIRS]
668   dirs.append((constants.LOG_OS_DIR, 0750))
669   for dir_name, dir_mode in dirs:
670     if not os.path.exists(dir_name):
671       try:
672         os.mkdir(dir_name, dir_mode)
673       except EnvironmentError, err:
674         if err.errno != errno.EEXIST:
675           print ("Node setup wrong, cannot create directory '%s': %s" %
676                  (dir_name, err))
677           sys.exit(5)
678     if not os.path.isdir(dir_name):
679       print ("Node setup wrong, '%s' is not a directory" % dir_name)
680       sys.exit(5)
681
682
683 def main():
684   """Main function for the node daemon.
685
686   """
687   global queue_lock
688
689   options, args = ParseOptions()
690   utils.debug = options.debug
691   for fname in (constants.SSL_CERT_FILE,):
692     if not os.path.isfile(fname):
693       print "config %s not there, will not run." % fname
694       sys.exit(5)
695
696   try:
697     port = utils.GetNodeDaemonPort()
698   except errors.ConfigurationError, err:
699     print "Cluster configuration incomplete: '%s'" % str(err)
700     sys.exit(5)
701
702   EnsureRuntimeEnvironment()
703
704   # become a daemon
705   if options.fork:
706     utils.Daemonize(logfile=constants.LOG_NODESERVER)
707
708   utils.WritePidFile(constants.NODED_PID)
709   try:
710     utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
711                        stderr_logging=not options.fork)
712     logging.info("ganeti node daemon startup")
713
714     # Read SSL certificate
715     ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
716                                     ssl_cert_path=constants.SSL_CERT_FILE)
717
718     # Prepare job queue
719     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
720
721     mainloop = daemon.Mainloop()
722     server = NodeHttpServer(mainloop, "", port,
723                             ssl_params=ssl_params, ssl_verify_peer=True)
724     server.Start()
725     try:
726       mainloop.Run()
727     finally:
728       server.Stop()
729   finally:
730     utils.RemovePidFile(constants.NODED_PID)
731
732
733 if __name__ == '__main__':
734   main()