Add RPC call to update ssconf files
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import SocketServer
31 import errno
32 import logging
33 import signal
34
35 from optparse import OptionParser
36
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
45
46
47 queue_lock = None
48
49
50 def _RequireJobQueueLock(fn):
51   """Decorator for job queue manipulating functions.
52
53   """
54   QUEUE_LOCK_TIMEOUT = 10
55
56   def wrapper(*args, **kwargs):
57     # Locking in exclusive, blocking mode because there could be several
58     # children running at the same time. Waiting up to 10 seconds.
59     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
60     try:
61       return fn(*args, **kwargs)
62     finally:
63       queue_lock.Unlock()
64
65   return wrapper
66
67
68 class NodeHttpServer(http.HttpServer):
69   """The server implementation.
70
71   This class holds all methods exposed over the RPC interface.
72
73   """
74   def __init__(self, *args, **kwargs):
75     http.HttpServer.__init__(self, *args, **kwargs)
76     self.noded_pid = os.getpid()
77
78   def HandleRequest(self, req):
79     """Handle a request.
80
81     """
82     if req.request_method.upper() != "PUT":
83       raise http.HTTPBadRequest()
84
85     path = req.request_path
86     if path.startswith("/"):
87       path = path[1:]
88
89     method = getattr(self, "perspective_%s" % path, None)
90     if method is None:
91       raise http.HTTPNotFound()
92
93     try:
94       try:
95         return method(req.request_post_data)
96       except:
97         logging.exception("Error in RPC call")
98         raise
99     except errors.QuitGanetiException, err:
100       # Tell parent to quit
101       os.kill(self.noded_pid, signal.SIGTERM)
102
103   # the new block devices  --------------------------
104
105   @staticmethod
106   def perspective_blockdev_create(params):
107     """Create a block device.
108
109     """
110     bdev_s, size, owner, on_primary, info = params
111     bdev = objects.Disk.FromDict(bdev_s)
112     if bdev is None:
113       raise ValueError("can't unserialize data!")
114     return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
115
116   @staticmethod
117   def perspective_blockdev_remove(params):
118     """Remove a block device.
119
120     """
121     bdev_s = params[0]
122     bdev = objects.Disk.FromDict(bdev_s)
123     return backend.RemoveBlockDevice(bdev)
124
125   @staticmethod
126   def perspective_blockdev_rename(params):
127     """Remove a block device.
128
129     """
130     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
131     return backend.RenameBlockDevices(devlist)
132
133   @staticmethod
134   def perspective_blockdev_assemble(params):
135     """Assemble a block device.
136
137     """
138     bdev_s, owner, on_primary = params
139     bdev = objects.Disk.FromDict(bdev_s)
140     if bdev is None:
141       raise ValueError("can't unserialize data!")
142     return backend.AssembleBlockDevice(bdev, owner, on_primary)
143
144   @staticmethod
145   def perspective_blockdev_shutdown(params):
146     """Shutdown a block device.
147
148     """
149     bdev_s = params[0]
150     bdev = objects.Disk.FromDict(bdev_s)
151     if bdev is None:
152       raise ValueError("can't unserialize data!")
153     return backend.ShutdownBlockDevice(bdev)
154
155   @staticmethod
156   def perspective_blockdev_addchildren(params):
157     """Add a child to a mirror device.
158
159     Note: this is only valid for mirror devices. It's the caller's duty
160     to send a correct disk, otherwise we raise an error.
161
162     """
163     bdev_s, ndev_s = params
164     bdev = objects.Disk.FromDict(bdev_s)
165     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
166     if bdev is None or ndevs.count(None) > 0:
167       raise ValueError("can't unserialize data!")
168     return backend.MirrorAddChildren(bdev, ndevs)
169
170   @staticmethod
171   def perspective_blockdev_removechildren(params):
172     """Remove a child from a mirror device.
173
174     This is only valid for mirror devices, of course. It's the callers
175     duty to send a correct disk, otherwise we raise an error.
176
177     """
178     bdev_s, ndev_s = params
179     bdev = objects.Disk.FromDict(bdev_s)
180     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
181     if bdev is None or ndevs.count(None) > 0:
182       raise ValueError("can't unserialize data!")
183     return backend.MirrorRemoveChildren(bdev, ndevs)
184
185   @staticmethod
186   def perspective_blockdev_getmirrorstatus(params):
187     """Return the mirror status for a list of disks.
188
189     """
190     disks = [objects.Disk.FromDict(dsk_s)
191             for dsk_s in params]
192     return backend.GetMirrorStatus(disks)
193
194   @staticmethod
195   def perspective_blockdev_find(params):
196     """Expose the FindBlockDevice functionality for a disk.
197
198     This will try to find but not activate a disk.
199
200     """
201     disk = objects.Disk.FromDict(params[0])
202     return backend.FindBlockDevice(disk)
203
204   @staticmethod
205   def perspective_blockdev_snapshot(params):
206     """Create a snapshot device.
207
208     Note that this is only valid for LVM disks, if we get passed
209     something else we raise an exception. The snapshot device can be
210     remove by calling the generic block device remove call.
211
212     """
213     cfbd = objects.Disk.FromDict(params[0])
214     return backend.SnapshotBlockDevice(cfbd)
215
216   @staticmethod
217   def perspective_blockdev_grow(params):
218     """Grow a stack of devices.
219
220     """
221     cfbd = objects.Disk.FromDict(params[0])
222     amount = params[1]
223     return backend.GrowBlockDevice(cfbd, amount)
224
225   @staticmethod
226   def perspective_blockdev_close(params):
227     """Closes the given block devices.
228
229     """
230     disks = [objects.Disk.FromDict(cf) for cf in params]
231     return backend.CloseBlockDevices(disks)
232
233   # export/import  --------------------------
234
235   @staticmethod
236   def perspective_snapshot_export(params):
237     """Export a given snapshot.
238
239     """
240     disk = objects.Disk.FromDict(params[0])
241     dest_node = params[1]
242     instance = objects.Instance.FromDict(params[2])
243     cluster_name = params[3]
244     dev_idx = params[4]
245     return backend.ExportSnapshot(disk, dest_node, instance,
246                                   cluster_name, dev_idx)
247
248   @staticmethod
249   def perspective_finalize_export(params):
250     """Expose the finalize export functionality.
251
252     """
253     instance = objects.Instance.FromDict(params[0])
254     snap_disks = [objects.Disk.FromDict(str_data)
255                   for str_data in params[1]]
256     return backend.FinalizeExport(instance, snap_disks)
257
258   @staticmethod
259   def perspective_export_info(params):
260     """Query information about an existing export on this node.
261
262     The given path may not contain an export, in which case we return
263     None.
264
265     """
266     path = params[0]
267     einfo = backend.ExportInfo(path)
268     if einfo is None:
269       return einfo
270     return einfo.Dumps()
271
272   @staticmethod
273   def perspective_export_list(params):
274     """List the available exports on this node.
275
276     Note that as opposed to export_info, which may query data about an
277     export in any path, this only queries the standard Ganeti path
278     (constants.EXPORT_DIR).
279
280     """
281     return backend.ListExports()
282
283   @staticmethod
284   def perspective_export_remove(params):
285     """Remove an export.
286
287     """
288     export = params[0]
289     return backend.RemoveExport(export)
290
291   # volume  --------------------------
292
293   @staticmethod
294   def perspective_volume_list(params):
295     """Query the list of logical volumes in a given volume group.
296
297     """
298     vgname = params[0]
299     return backend.GetVolumeList(vgname)
300
301   @staticmethod
302   def perspective_vg_list(params):
303     """Query the list of volume groups.
304
305     """
306     return backend.ListVolumeGroups()
307
308   # bridge  --------------------------
309
310   @staticmethod
311   def perspective_bridges_exist(params):
312     """Check if all bridges given exist on this node.
313
314     """
315     bridges_list = params[0]
316     return backend.BridgesExist(bridges_list)
317
318   # instance  --------------------------
319
320   @staticmethod
321   def perspective_instance_os_add(params):
322     """Install an OS on a given instance.
323
324     """
325     inst_s = params[0]
326     inst = objects.Instance.FromDict(inst_s)
327     return backend.AddOSToInstance(inst)
328
329   @staticmethod
330   def perspective_instance_run_rename(params):
331     """Runs the OS rename script for an instance.
332
333     """
334     inst_s, old_name = params
335     inst = objects.Instance.FromDict(inst_s)
336     return backend.RunRenameInstance(inst, old_name)
337
338   @staticmethod
339   def perspective_instance_os_import(params):
340     """Run the import function of an OS onto a given instance.
341
342     """
343     inst_s, src_node, src_images, cluster_name = params
344     inst = objects.Instance.FromDict(inst_s)
345     return backend.ImportOSIntoInstance(inst, src_node, src_images,
346                                         cluster_name)
347
348   @staticmethod
349   def perspective_instance_shutdown(params):
350     """Shutdown an instance.
351
352     """
353     instance = objects.Instance.FromDict(params[0])
354     return backend.ShutdownInstance(instance)
355
356   @staticmethod
357   def perspective_instance_start(params):
358     """Start an instance.
359
360     """
361     instance = objects.Instance.FromDict(params[0])
362     extra_args = params[1]
363     return backend.StartInstance(instance, extra_args)
364
365   @staticmethod
366   def perspective_instance_migrate(params):
367     """Migrates an instance.
368
369     """
370     instance, target, live = params
371     instance = objects.Instance.FromDict(instance)
372     return backend.MigrateInstance(instance, target, live)
373
374   @staticmethod
375   def perspective_instance_reboot(params):
376     """Reboot an instance.
377
378     """
379     instance = objects.Instance.FromDict(params[0])
380     reboot_type = params[1]
381     extra_args = params[2]
382     return backend.RebootInstance(instance, reboot_type, extra_args)
383
384   @staticmethod
385   def perspective_instance_info(params):
386     """Query instance information.
387
388     """
389     return backend.GetInstanceInfo(params[0], params[1])
390
391   @staticmethod
392   def perspective_all_instances_info(params):
393     """Query information about all instances.
394
395     """
396     return backend.GetAllInstancesInfo(params[0])
397
398   @staticmethod
399   def perspective_instance_list(params):
400     """Query the list of running instances.
401
402     """
403     return backend.GetInstanceList(params[0])
404
405   # node --------------------------
406
407   @staticmethod
408   def perspective_node_tcp_ping(params):
409     """Do a TcpPing on the remote node.
410
411     """
412     return utils.TcpPing(params[1], params[2], timeout=params[3],
413                          live_port_needed=params[4], source=params[0])
414
415   @staticmethod
416   def perspective_node_has_ip_address(params):
417     """Checks if a node has the given ip address.
418
419     """
420     return utils.OwnIpAddress(params[0])
421
422   @staticmethod
423   def perspective_node_info(params):
424     """Query node information.
425
426     """
427     vgname, hypervisor_type = params
428     return backend.GetNodeInfo(vgname, hypervisor_type)
429
430   @staticmethod
431   def perspective_node_add(params):
432     """Complete the registration of this node in the cluster.
433
434     """
435     return backend.AddNode(params[0], params[1], params[2],
436                            params[3], params[4], params[5])
437
438   @staticmethod
439   def perspective_node_verify(params):
440     """Run a verify sequence on this node.
441
442     """
443     return backend.VerifyNode(params[0], params[1])
444
445   @staticmethod
446   def perspective_node_start_master(params):
447     """Promote this node to master status.
448
449     """
450     return backend.StartMaster(params[0])
451
452   @staticmethod
453   def perspective_node_stop_master(params):
454     """Demote this node from master status.
455
456     """
457     return backend.StopMaster(params[0])
458
459   @staticmethod
460   def perspective_node_leave_cluster(params):
461     """Cleanup after leaving a cluster.
462
463     """
464     return backend.LeaveCluster()
465
466   @staticmethod
467   def perspective_node_volumes(params):
468     """Query the list of all logical volume groups.
469
470     """
471     return backend.NodeVolumes()
472
473   # cluster --------------------------
474
475   @staticmethod
476   def perspective_version(params):
477     """Query version information.
478
479     """
480     return constants.PROTOCOL_VERSION
481
482   @staticmethod
483   def perspective_upload_file(params):
484     """Upload a file.
485
486     Note that the backend implementation imposes strict rules on which
487     files are accepted.
488
489     """
490     return backend.UploadFile(*params)
491
492   @staticmethod
493   def perspective_master_info(params):
494     """Query master information.
495
496     """
497     return backend.GetMasterInfo()
498
499   @staticmethod
500   def perspective_write_ssconf_files(params):
501     """Write ssconf files.
502
503     """
504     return backend.WriteSsconfFiles()
505
506   # os -----------------------
507
508   @staticmethod
509   def perspective_os_diagnose(params):
510     """Query detailed information about existing OSes.
511
512     """
513     return [os.ToDict() for os in backend.DiagnoseOS()]
514
515   @staticmethod
516   def perspective_os_get(params):
517     """Query information about a given OS.
518
519     """
520     name = params[0]
521     try:
522       os_obj = backend.OSFromDisk(name)
523     except errors.InvalidOS, err:
524       os_obj = objects.OS.FromInvalidOS(err)
525     return os_obj.ToDict()
526
527   # hooks -----------------------
528
529   @staticmethod
530   def perspective_hooks_runner(params):
531     """Run hook scripts.
532
533     """
534     hpath, phase, env = params
535     hr = backend.HooksRunner()
536     return hr.RunHooks(hpath, phase, env)
537
538   # iallocator -----------------
539
540   @staticmethod
541   def perspective_iallocator_runner(params):
542     """Run an iallocator script.
543
544     """
545     name, idata = params
546     iar = backend.IAllocatorRunner()
547     return iar.Run(name, idata)
548
549   # test -----------------------
550
551   @staticmethod
552   def perspective_test_delay(params):
553     """Run test delay.
554
555     """
556     duration = params[0]
557     return utils.TestDelay(duration)
558
559   # file storage ---------------
560
561   @staticmethod
562   def perspective_file_storage_dir_create(params):
563     """Create the file storage directory.
564
565     """
566     file_storage_dir = params[0]
567     return backend.CreateFileStorageDir(file_storage_dir)
568
569   @staticmethod
570   def perspective_file_storage_dir_remove(params):
571     """Remove the file storage directory.
572
573     """
574     file_storage_dir = params[0]
575     return backend.RemoveFileStorageDir(file_storage_dir)
576
577   @staticmethod
578   def perspective_file_storage_dir_rename(params):
579     """Rename the file storage directory.
580
581     """
582     old_file_storage_dir = params[0]
583     new_file_storage_dir = params[1]
584     return backend.RenameFileStorageDir(old_file_storage_dir,
585                                         new_file_storage_dir)
586
587   # jobs ------------------------
588
589   @staticmethod
590   @_RequireJobQueueLock
591   def perspective_jobqueue_update(params):
592     """Update job queue.
593
594     """
595     (file_name, content) = params
596     return backend.JobQueueUpdate(file_name, content)
597
598   @staticmethod
599   @_RequireJobQueueLock
600   def perspective_jobqueue_purge(params):
601     """Purge job queue.
602
603     """
604     return backend.JobQueuePurge()
605
606   @staticmethod
607   @_RequireJobQueueLock
608   def perspective_jobqueue_rename(params):
609     """Rename a job queue file.
610
611     """
612     (old, new) = params
613
614     return backend.JobQueueRename(old, new)
615
616   @staticmethod
617   def perspective_jobqueue_set_drain(params):
618     """Set/unset the queue drain flag.
619
620     """
621     drain_flag = params[0]
622     return backend.JobQueueSetDrainFlag(drain_flag)
623
624
625   # hypervisor ---------------
626
627   @staticmethod
628   def perspective_hypervisor_validate_params(params):
629     """Validate the hypervisor parameters.
630
631     """
632     (hvname, hvparams) = params
633     return backend.ValidateHVParams(hvname, hvparams)
634
635
636 def ParseOptions():
637   """Parse the command line options.
638
639   Returns:
640     (options, args) as from OptionParser.parse_args()
641
642   """
643   parser = OptionParser(description="Ganeti node daemon",
644                         usage="%prog [-f] [-d]",
645                         version="%%prog (ganeti) %s" %
646                         constants.RELEASE_VERSION)
647
648   parser.add_option("-f", "--foreground", dest="fork",
649                     help="Don't detach from the current terminal",
650                     default=True, action="store_false")
651   parser.add_option("-d", "--debug", dest="debug",
652                     help="Enable some debug messages",
653                     default=False, action="store_true")
654   options, args = parser.parse_args()
655   return options, args
656
657
658 def EnsureRuntimeEnvironment():
659   """Ensure our run-time environment is complete.
660
661   Currently this creates directories which could be missing, either
662   due to directories being on a tmpfs mount, or due to incomplete
663   packaging.
664
665   """
666   dirs = [(val, 0755) for val in constants.SUB_RUN_DIRS]
667   dirs.append((constants.LOG_OS_DIR, 0750))
668   for dir_name, dir_mode in dirs:
669     if not os.path.exists(dir_name):
670       try:
671         os.mkdir(dir_name, dir_mode)
672       except EnvironmentError, err:
673         if err.errno != errno.EEXIST:
674           print ("Node setup wrong, cannot create directory '%s': %s" %
675                  (dir_name, err))
676           sys.exit(5)
677     if not os.path.isdir(dir_name):
678       print ("Node setup wrong, '%s' is not a directory" % dir_name)
679       sys.exit(5)
680
681
682 def main():
683   """Main function for the node daemon.
684
685   """
686   global queue_lock
687
688   options, args = ParseOptions()
689   utils.debug = options.debug
690   for fname in (constants.SSL_CERT_FILE,):
691     if not os.path.isfile(fname):
692       print "config %s not there, will not run." % fname
693       sys.exit(5)
694
695   try:
696     port = utils.GetNodeDaemonPort()
697     pwdata = utils.GetNodeDaemonPassword()
698   except errors.ConfigurationError, err:
699     print "Cluster configuration incomplete: '%s'" % str(err)
700     sys.exit(5)
701
702   EnsureRuntimeEnvironment()
703
704   # become a daemon
705   if options.fork:
706     utils.Daemonize(logfile=constants.LOG_NODESERVER)
707
708   utils.WritePidFile(constants.NODED_PID)
709   try:
710     utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
711                        stderr_logging=not options.fork)
712     logging.info("ganeti node daemon startup")
713
714     # Prepare job queue
715     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
716
717     mainloop = daemon.Mainloop()
718     server = NodeHttpServer(mainloop, "", port)
719     server.Start()
720     try:
721       mainloop.Run()
722     finally:
723       server.Stop()
724   finally:
725     utils.RemovePidFile(constants.NODED_PID)
726
727
728 if __name__ == '__main__':
729   main()