OS Design: invert new and old name
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import SocketServer
31 import errno
32 import logging
33 import signal
34
35 from optparse import OptionParser
36
37 from ganeti import backend
38 from ganeti import logger
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import errors
42 from ganeti import jstore
43 from ganeti import daemon
44 from ganeti import http
45 from ganeti import utils
46
47
48 queue_lock = None
49
50
51 def _RequireJobQueueLock(fn):
52   """Decorator for job queue manipulating functions.
53
54   """
55   QUEUE_LOCK_TIMEOUT = 10
56
57   def wrapper(*args, **kwargs):
58     # Locking in exclusive, blocking mode because there could be several
59     # children running at the same time. Waiting up to 10 seconds.
60     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
61     try:
62       return fn(*args, **kwargs)
63     finally:
64       queue_lock.Unlock()
65
66   return wrapper
67
68
69 class NodeHttpServer(http.HttpServer):
70   """The server implementation.
71
72   This class holds all methods exposed over the RPC interface.
73
74   """
75   def __init__(self, *args, **kwargs):
76     http.HttpServer.__init__(self, *args, **kwargs)
77     self.noded_pid = os.getpid()
78
79   def HandleRequest(self, req):
80     """Handle a request.
81
82     """
83     if req.request_method.upper() != "PUT":
84       raise http.HTTPBadRequest()
85
86     path = req.request_path
87     if path.startswith("/"):
88       path = path[1:]
89
90     method = getattr(self, "perspective_%s" % path, None)
91     if method is None:
92       raise http.HTTPNotFound()
93
94     try:
95       try:
96         return method(req.request_post_data)
97       except:
98         logging.exception("Error in RPC call")
99         raise
100     except errors.QuitGanetiException, err:
101       # Tell parent to quit
102       os.kill(self.noded_pid, signal.SIGTERM)
103
104   # the new block devices  --------------------------
105
106   @staticmethod
107   def perspective_blockdev_create(params):
108     """Create a block device.
109
110     """
111     bdev_s, size, owner, on_primary, info = params
112     bdev = objects.Disk.FromDict(bdev_s)
113     if bdev is None:
114       raise ValueError("can't unserialize data!")
115     return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
116
117   @staticmethod
118   def perspective_blockdev_remove(params):
119     """Remove a block device.
120
121     """
122     bdev_s = params[0]
123     bdev = objects.Disk.FromDict(bdev_s)
124     return backend.RemoveBlockDevice(bdev)
125
126   @staticmethod
127   def perspective_blockdev_rename(params):
128     """Remove a block device.
129
130     """
131     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
132     return backend.RenameBlockDevices(devlist)
133
134   @staticmethod
135   def perspective_blockdev_assemble(params):
136     """Assemble a block device.
137
138     """
139     bdev_s, owner, on_primary = params
140     bdev = objects.Disk.FromDict(bdev_s)
141     if bdev is None:
142       raise ValueError("can't unserialize data!")
143     return backend.AssembleBlockDevice(bdev, owner, on_primary)
144
145   @staticmethod
146   def perspective_blockdev_shutdown(params):
147     """Shutdown a block device.
148
149     """
150     bdev_s = params[0]
151     bdev = objects.Disk.FromDict(bdev_s)
152     if bdev is None:
153       raise ValueError("can't unserialize data!")
154     return backend.ShutdownBlockDevice(bdev)
155
156   @staticmethod
157   def perspective_blockdev_addchildren(params):
158     """Add a child to a mirror device.
159
160     Note: this is only valid for mirror devices. It's the caller's duty
161     to send a correct disk, otherwise we raise an error.
162
163     """
164     bdev_s, ndev_s = params
165     bdev = objects.Disk.FromDict(bdev_s)
166     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
167     if bdev is None or ndevs.count(None) > 0:
168       raise ValueError("can't unserialize data!")
169     return backend.MirrorAddChildren(bdev, ndevs)
170
171   @staticmethod
172   def perspective_blockdev_removechildren(params):
173     """Remove a child from a mirror device.
174
175     This is only valid for mirror devices, of course. It's the callers
176     duty to send a correct disk, otherwise we raise an error.
177
178     """
179     bdev_s, ndev_s = params
180     bdev = objects.Disk.FromDict(bdev_s)
181     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
182     if bdev is None or ndevs.count(None) > 0:
183       raise ValueError("can't unserialize data!")
184     return backend.MirrorRemoveChildren(bdev, ndevs)
185
186   @staticmethod
187   def perspective_blockdev_getmirrorstatus(params):
188     """Return the mirror status for a list of disks.
189
190     """
191     disks = [objects.Disk.FromDict(dsk_s)
192             for dsk_s in params]
193     return backend.GetMirrorStatus(disks)
194
195   @staticmethod
196   def perspective_blockdev_find(params):
197     """Expose the FindBlockDevice functionality for a disk.
198
199     This will try to find but not activate a disk.
200
201     """
202     disk = objects.Disk.FromDict(params[0])
203     return backend.FindBlockDevice(disk)
204
205   @staticmethod
206   def perspective_blockdev_snapshot(params):
207     """Create a snapshot device.
208
209     Note that this is only valid for LVM disks, if we get passed
210     something else we raise an exception. The snapshot device can be
211     remove by calling the generic block device remove call.
212
213     """
214     cfbd = objects.Disk.FromDict(params[0])
215     return backend.SnapshotBlockDevice(cfbd)
216
217   @staticmethod
218   def perspective_blockdev_grow(params):
219     """Grow a stack of devices.
220
221     """
222     cfbd = objects.Disk.FromDict(params[0])
223     amount = params[1]
224     return backend.GrowBlockDevice(cfbd, amount)
225
226   @staticmethod
227   def perspective_blockdev_close(params):
228     """Closes the given block devices.
229
230     """
231     disks = [objects.Disk.FromDict(cf) for cf in params]
232     return backend.CloseBlockDevices(disks)
233
234   # export/import  --------------------------
235
236   @staticmethod
237   def perspective_snapshot_export(params):
238     """Export a given snapshot.
239
240     """
241     disk = objects.Disk.FromDict(params[0])
242     dest_node = params[1]
243     instance = objects.Instance.FromDict(params[2])
244     cluster_name = params[3]
245     return backend.ExportSnapshot(disk, dest_node, instance, cluster_name)
246
247   @staticmethod
248   def perspective_finalize_export(params):
249     """Expose the finalize export functionality.
250
251     """
252     instance = objects.Instance.FromDict(params[0])
253     snap_disks = [objects.Disk.FromDict(str_data)
254                   for str_data in params[1]]
255     return backend.FinalizeExport(instance, snap_disks)
256
257   @staticmethod
258   def perspective_export_info(params):
259     """Query information about an existing export on this node.
260
261     The given path may not contain an export, in which case we return
262     None.
263
264     """
265     path = params[0]
266     einfo = backend.ExportInfo(path)
267     if einfo is None:
268       return einfo
269     return einfo.Dumps()
270
271   @staticmethod
272   def perspective_export_list(params):
273     """List the available exports on this node.
274
275     Note that as opposed to export_info, which may query data about an
276     export in any path, this only queries the standard Ganeti path
277     (constants.EXPORT_DIR).
278
279     """
280     return backend.ListExports()
281
282   @staticmethod
283   def perspective_export_remove(params):
284     """Remove an export.
285
286     """
287     export = params[0]
288     return backend.RemoveExport(export)
289
290   # volume  --------------------------
291
292   @staticmethod
293   def perspective_volume_list(params):
294     """Query the list of logical volumes in a given volume group.
295
296     """
297     vgname = params[0]
298     return backend.GetVolumeList(vgname)
299
300   @staticmethod
301   def perspective_vg_list(params):
302     """Query the list of volume groups.
303
304     """
305     return backend.ListVolumeGroups()
306
307   # bridge  --------------------------
308
309   @staticmethod
310   def perspective_bridges_exist(params):
311     """Check if all bridges given exist on this node.
312
313     """
314     bridges_list = params[0]
315     return backend.BridgesExist(bridges_list)
316
317   # instance  --------------------------
318
319   @staticmethod
320   def perspective_instance_os_add(params):
321     """Install an OS on a given instance.
322
323     """
324     inst_s, os_disk, swap_disk = params
325     inst = objects.Instance.FromDict(inst_s)
326     return backend.AddOSToInstance(inst, os_disk, swap_disk)
327
328   @staticmethod
329   def perspective_instance_run_rename(params):
330     """Runs the OS rename script for an instance.
331
332     """
333     inst_s, old_name, os_disk, swap_disk = params
334     inst = objects.Instance.FromDict(inst_s)
335     return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
336
337   @staticmethod
338   def perspective_instance_os_import(params):
339     """Run the import function of an OS onto a given instance.
340
341     """
342     inst_s, os_disk, swap_disk, src_node, src_image, cluster_name = params
343     inst = objects.Instance.FromDict(inst_s)
344     return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
345                                         src_node, src_image, cluster_name)
346
347   @staticmethod
348   def perspective_instance_shutdown(params):
349     """Shutdown an instance.
350
351     """
352     instance = objects.Instance.FromDict(params[0])
353     return backend.ShutdownInstance(instance)
354
355   @staticmethod
356   def perspective_instance_start(params):
357     """Start an instance.
358
359     """
360     instance = objects.Instance.FromDict(params[0])
361     extra_args = params[1]
362     return backend.StartInstance(instance, extra_args)
363
364   @staticmethod
365   def perspective_instance_migrate(params):
366     """Migrates an instance.
367
368     """
369     instance, target, live = params
370     instance = objects.Instance.FromDict(instance)
371     return backend.MigrateInstance(instance, target, live)
372
373   @staticmethod
374   def perspective_instance_reboot(params):
375     """Reboot an instance.
376
377     """
378     instance = objects.Instance.FromDict(params[0])
379     reboot_type = params[1]
380     extra_args = params[2]
381     return backend.RebootInstance(instance, reboot_type, extra_args)
382
383   @staticmethod
384   def perspective_instance_info(params):
385     """Query instance information.
386
387     """
388     return backend.GetInstanceInfo(params[0], params[1])
389
390   @staticmethod
391   def perspective_all_instances_info(params):
392     """Query information about all instances.
393
394     """
395     return backend.GetAllInstancesInfo(params[0])
396
397   @staticmethod
398   def perspective_instance_list(params):
399     """Query the list of running instances.
400
401     """
402     return backend.GetInstanceList(params[0])
403
404   # node --------------------------
405
406   @staticmethod
407   def perspective_node_tcp_ping(params):
408     """Do a TcpPing on the remote node.
409
410     """
411     return utils.TcpPing(params[1], params[2], timeout=params[3],
412                          live_port_needed=params[4], source=params[0])
413
414   @staticmethod
415   def perspective_node_has_ip_address(params):
416     """Checks if a node has the given ip address.
417
418     """
419     return utils.OwnIpAddress(params[0])
420
421   @staticmethod
422   def perspective_node_info(params):
423     """Query node information.
424
425     """
426     vgname, hypervisor_type = params
427     return backend.GetNodeInfo(vgname, hypervisor_type)
428
429   @staticmethod
430   def perspective_node_add(params):
431     """Complete the registration of this node in the cluster.
432
433     """
434     return backend.AddNode(params[0], params[1], params[2],
435                            params[3], params[4], params[5])
436
437   @staticmethod
438   def perspective_node_verify(params):
439     """Run a verify sequence on this node.
440
441     """
442     return backend.VerifyNode(params[0], params[1])
443
444   @staticmethod
445   def perspective_node_start_master(params):
446     """Promote this node to master status.
447
448     """
449     return backend.StartMaster(params[0])
450
451   @staticmethod
452   def perspective_node_stop_master(params):
453     """Demote this node from master status.
454
455     """
456     return backend.StopMaster(params[0])
457
458   @staticmethod
459   def perspective_node_leave_cluster(params):
460     """Cleanup after leaving a cluster.
461
462     """
463     return backend.LeaveCluster()
464
465   @staticmethod
466   def perspective_node_volumes(params):
467     """Query the list of all logical volume groups.
468
469     """
470     return backend.NodeVolumes()
471
472   # cluster --------------------------
473
474   @staticmethod
475   def perspective_version(params):
476     """Query version information.
477
478     """
479     return constants.PROTOCOL_VERSION
480
481   @staticmethod
482   def perspective_upload_file(params):
483     """Upload a file.
484
485     Note that the backend implementation imposes strict rules on which
486     files are accepted.
487
488     """
489     return backend.UploadFile(*params)
490
491   @staticmethod
492   def perspective_master_info(params):
493     """Query master information.
494
495     """
496     return backend.GetMasterInfo()
497
498   # os -----------------------
499
500   @staticmethod
501   def perspective_os_diagnose(params):
502     """Query detailed information about existing OSes.
503
504     """
505     return [os.ToDict() for os in backend.DiagnoseOS()]
506
507   @staticmethod
508   def perspective_os_get(params):
509     """Query information about a given OS.
510
511     """
512     name = params[0]
513     try:
514       os_obj = backend.OSFromDisk(name)
515     except errors.InvalidOS, err:
516       os_obj = objects.OS.FromInvalidOS(err)
517     return os_obj.ToDict()
518
519   # hooks -----------------------
520
521   @staticmethod
522   def perspective_hooks_runner(params):
523     """Run hook scripts.
524
525     """
526     hpath, phase, env = params
527     hr = backend.HooksRunner()
528     return hr.RunHooks(hpath, phase, env)
529
530   # iallocator -----------------
531
532   @staticmethod
533   def perspective_iallocator_runner(params):
534     """Run an iallocator script.
535
536     """
537     name, idata = params
538     iar = backend.IAllocatorRunner()
539     return iar.Run(name, idata)
540
541   # test -----------------------
542
543   @staticmethod
544   def perspective_test_delay(params):
545     """Run test delay.
546
547     """
548     duration = params[0]
549     return utils.TestDelay(duration)
550
551   # file storage ---------------
552
553   @staticmethod
554   def perspective_file_storage_dir_create(params):
555     """Create the file storage directory.
556
557     """
558     file_storage_dir = params[0]
559     return backend.CreateFileStorageDir(file_storage_dir)
560
561   @staticmethod
562   def perspective_file_storage_dir_remove(params):
563     """Remove the file storage directory.
564
565     """
566     file_storage_dir = params[0]
567     return backend.RemoveFileStorageDir(file_storage_dir)
568
569   @staticmethod
570   def perspective_file_storage_dir_rename(params):
571     """Rename the file storage directory.
572
573     """
574     old_file_storage_dir = params[0]
575     new_file_storage_dir = params[1]
576     return backend.RenameFileStorageDir(old_file_storage_dir,
577                                         new_file_storage_dir)
578
579   # jobs ------------------------
580
581   @staticmethod
582   @_RequireJobQueueLock
583   def perspective_jobqueue_update(params):
584     """Update job queue.
585
586     """
587     (file_name, content) = params
588     return backend.JobQueueUpdate(file_name, content)
589
590   @staticmethod
591   @_RequireJobQueueLock
592   def perspective_jobqueue_purge(params):
593     """Purge job queue.
594
595     """
596     return backend.JobQueuePurge()
597
598   @staticmethod
599   @_RequireJobQueueLock
600   def perspective_jobqueue_rename(params):
601     """Rename a job queue file.
602
603     """
604     (old, new) = params
605
606     return backend.JobQueueRename(old, new)
607
608   @staticmethod
609   def perspective_jobqueue_set_drain(params):
610     """Set/unset the queue drain flag.
611
612     """
613     drain_flag = params[0]
614     return backend.JobQueueSetDrainFlag(drain_flag)
615
616
617   # hypervisor ---------------
618
619   @staticmethod
620   def perspective_hypervisor_validate_params(params):
621     """Validate the hypervisor parameters.
622
623     """
624     (hvname, hvparams) = params
625     return backend.ValidateHVParams(hvname, hvparams)
626
627
628 def ParseOptions():
629   """Parse the command line options.
630
631   Returns:
632     (options, args) as from OptionParser.parse_args()
633
634   """
635   parser = OptionParser(description="Ganeti node daemon",
636                         usage="%prog [-f] [-d]",
637                         version="%%prog (ganeti) %s" %
638                         constants.RELEASE_VERSION)
639
640   parser.add_option("-f", "--foreground", dest="fork",
641                     help="Don't detach from the current terminal",
642                     default=True, action="store_false")
643   parser.add_option("-d", "--debug", dest="debug",
644                     help="Enable some debug messages",
645                     default=False, action="store_true")
646   options, args = parser.parse_args()
647   return options, args
648
649
650 def main():
651   """Main function for the node daemon.
652
653   """
654   global queue_lock
655
656   options, args = ParseOptions()
657   utils.debug = options.debug
658   for fname in (constants.SSL_CERT_FILE,):
659     if not os.path.isfile(fname):
660       print "config %s not there, will not run." % fname
661       sys.exit(5)
662
663   try:
664     port = utils.GetNodeDaemonPort()
665     pwdata = utils.GetNodeDaemonPassword()
666   except errors.ConfigurationError, err:
667     print "Cluster configuration incomplete: '%s'" % str(err)
668     sys.exit(5)
669
670   # create the various SUB_RUN_DIRS, if not existing, so that we handle the
671   # situation where RUN_DIR is tmpfs
672   for dir_name in constants.SUB_RUN_DIRS:
673     if not os.path.exists(dir_name):
674       try:
675         os.mkdir(dir_name, 0755)
676       except EnvironmentError, err:
677         if err.errno != errno.EEXIST:
678           print ("Node setup wrong, cannot create directory %s: %s" %
679                  (dir_name, err))
680           sys.exit(5)
681     if not os.path.isdir(dir_name):
682       print ("Node setup wrong, %s is not a directory" % dir_name)
683       sys.exit(5)
684
685   # become a daemon
686   if options.fork:
687     utils.Daemonize(logfile=constants.LOG_NODESERVER)
688
689   utils.WritePidFile(constants.NODED_PID)
690   try:
691     logger.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
692                         stderr_logging=not options.fork)
693     logging.info("ganeti node daemon startup")
694
695     # Prepare job queue
696     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
697
698     mainloop = daemon.Mainloop()
699     server = NodeHttpServer(mainloop, ("", port))
700     server.Start()
701     try:
702       mainloop.Run()
703     finally:
704       server.Stop()
705   finally:
706     utils.RemovePidFile(constants.NODED_PID)
707
708
709 if __name__ == '__main__':
710   main()