Abstract runtime creation of dirs into a function
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import SocketServer
31 import errno
32 import logging
33 import signal
34
35 from optparse import OptionParser
36
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
45
46
47 queue_lock = None
48
49
50 def _RequireJobQueueLock(fn):
51   """Decorator for job queue manipulating functions.
52
53   """
54   QUEUE_LOCK_TIMEOUT = 10
55
56   def wrapper(*args, **kwargs):
57     # Locking in exclusive, blocking mode because there could be several
58     # children running at the same time. Waiting up to 10 seconds.
59     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
60     try:
61       return fn(*args, **kwargs)
62     finally:
63       queue_lock.Unlock()
64
65   return wrapper
66
67
68 class NodeHttpServer(http.HttpServer):
69   """The server implementation.
70
71   This class holds all methods exposed over the RPC interface.
72
73   """
74   def __init__(self, *args, **kwargs):
75     http.HttpServer.__init__(self, *args, **kwargs)
76     self.noded_pid = os.getpid()
77
78   def HandleRequest(self, req):
79     """Handle a request.
80
81     """
82     if req.request_method.upper() != "PUT":
83       raise http.HTTPBadRequest()
84
85     path = req.request_path
86     if path.startswith("/"):
87       path = path[1:]
88
89     method = getattr(self, "perspective_%s" % path, None)
90     if method is None:
91       raise http.HTTPNotFound()
92
93     try:
94       try:
95         return method(req.request_post_data)
96       except:
97         logging.exception("Error in RPC call")
98         raise
99     except errors.QuitGanetiException, err:
100       # Tell parent to quit
101       os.kill(self.noded_pid, signal.SIGTERM)
102
103   # the new block devices  --------------------------
104
105   @staticmethod
106   def perspective_blockdev_create(params):
107     """Create a block device.
108
109     """
110     bdev_s, size, owner, on_primary, info = params
111     bdev = objects.Disk.FromDict(bdev_s)
112     if bdev is None:
113       raise ValueError("can't unserialize data!")
114     return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
115
116   @staticmethod
117   def perspective_blockdev_remove(params):
118     """Remove a block device.
119
120     """
121     bdev_s = params[0]
122     bdev = objects.Disk.FromDict(bdev_s)
123     return backend.RemoveBlockDevice(bdev)
124
125   @staticmethod
126   def perspective_blockdev_rename(params):
127     """Remove a block device.
128
129     """
130     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
131     return backend.RenameBlockDevices(devlist)
132
133   @staticmethod
134   def perspective_blockdev_assemble(params):
135     """Assemble a block device.
136
137     """
138     bdev_s, owner, on_primary = params
139     bdev = objects.Disk.FromDict(bdev_s)
140     if bdev is None:
141       raise ValueError("can't unserialize data!")
142     return backend.AssembleBlockDevice(bdev, owner, on_primary)
143
144   @staticmethod
145   def perspective_blockdev_shutdown(params):
146     """Shutdown a block device.
147
148     """
149     bdev_s = params[0]
150     bdev = objects.Disk.FromDict(bdev_s)
151     if bdev is None:
152       raise ValueError("can't unserialize data!")
153     return backend.ShutdownBlockDevice(bdev)
154
155   @staticmethod
156   def perspective_blockdev_addchildren(params):
157     """Add a child to a mirror device.
158
159     Note: this is only valid for mirror devices. It's the caller's duty
160     to send a correct disk, otherwise we raise an error.
161
162     """
163     bdev_s, ndev_s = params
164     bdev = objects.Disk.FromDict(bdev_s)
165     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
166     if bdev is None or ndevs.count(None) > 0:
167       raise ValueError("can't unserialize data!")
168     return backend.MirrorAddChildren(bdev, ndevs)
169
170   @staticmethod
171   def perspective_blockdev_removechildren(params):
172     """Remove a child from a mirror device.
173
174     This is only valid for mirror devices, of course. It's the callers
175     duty to send a correct disk, otherwise we raise an error.
176
177     """
178     bdev_s, ndev_s = params
179     bdev = objects.Disk.FromDict(bdev_s)
180     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
181     if bdev is None or ndevs.count(None) > 0:
182       raise ValueError("can't unserialize data!")
183     return backend.MirrorRemoveChildren(bdev, ndevs)
184
185   @staticmethod
186   def perspective_blockdev_getmirrorstatus(params):
187     """Return the mirror status for a list of disks.
188
189     """
190     disks = [objects.Disk.FromDict(dsk_s)
191             for dsk_s in params]
192     return backend.GetMirrorStatus(disks)
193
194   @staticmethod
195   def perspective_blockdev_find(params):
196     """Expose the FindBlockDevice functionality for a disk.
197
198     This will try to find but not activate a disk.
199
200     """
201     disk = objects.Disk.FromDict(params[0])
202     return backend.FindBlockDevice(disk)
203
204   @staticmethod
205   def perspective_blockdev_snapshot(params):
206     """Create a snapshot device.
207
208     Note that this is only valid for LVM disks, if we get passed
209     something else we raise an exception. The snapshot device can be
210     remove by calling the generic block device remove call.
211
212     """
213     cfbd = objects.Disk.FromDict(params[0])
214     return backend.SnapshotBlockDevice(cfbd)
215
216   @staticmethod
217   def perspective_blockdev_grow(params):
218     """Grow a stack of devices.
219
220     """
221     cfbd = objects.Disk.FromDict(params[0])
222     amount = params[1]
223     return backend.GrowBlockDevice(cfbd, amount)
224
225   @staticmethod
226   def perspective_blockdev_close(params):
227     """Closes the given block devices.
228
229     """
230     disks = [objects.Disk.FromDict(cf) for cf in params]
231     return backend.CloseBlockDevices(disks)
232
233   # export/import  --------------------------
234
235   @staticmethod
236   def perspective_snapshot_export(params):
237     """Export a given snapshot.
238
239     """
240     disk = objects.Disk.FromDict(params[0])
241     dest_node = params[1]
242     instance = objects.Instance.FromDict(params[2])
243     cluster_name = params[3]
244     dev_idx = params[4]
245     return backend.ExportSnapshot(disk, dest_node, instance,
246                                   cluster_name, dev_idx)
247
248   @staticmethod
249   def perspective_finalize_export(params):
250     """Expose the finalize export functionality.
251
252     """
253     instance = objects.Instance.FromDict(params[0])
254     snap_disks = [objects.Disk.FromDict(str_data)
255                   for str_data in params[1]]
256     return backend.FinalizeExport(instance, snap_disks)
257
258   @staticmethod
259   def perspective_export_info(params):
260     """Query information about an existing export on this node.
261
262     The given path may not contain an export, in which case we return
263     None.
264
265     """
266     path = params[0]
267     einfo = backend.ExportInfo(path)
268     if einfo is None:
269       return einfo
270     return einfo.Dumps()
271
272   @staticmethod
273   def perspective_export_list(params):
274     """List the available exports on this node.
275
276     Note that as opposed to export_info, which may query data about an
277     export in any path, this only queries the standard Ganeti path
278     (constants.EXPORT_DIR).
279
280     """
281     return backend.ListExports()
282
283   @staticmethod
284   def perspective_export_remove(params):
285     """Remove an export.
286
287     """
288     export = params[0]
289     return backend.RemoveExport(export)
290
291   # volume  --------------------------
292
293   @staticmethod
294   def perspective_volume_list(params):
295     """Query the list of logical volumes in a given volume group.
296
297     """
298     vgname = params[0]
299     return backend.GetVolumeList(vgname)
300
301   @staticmethod
302   def perspective_vg_list(params):
303     """Query the list of volume groups.
304
305     """
306     return backend.ListVolumeGroups()
307
308   # bridge  --------------------------
309
310   @staticmethod
311   def perspective_bridges_exist(params):
312     """Check if all bridges given exist on this node.
313
314     """
315     bridges_list = params[0]
316     return backend.BridgesExist(bridges_list)
317
318   # instance  --------------------------
319
320   @staticmethod
321   def perspective_instance_os_add(params):
322     """Install an OS on a given instance.
323
324     """
325     inst_s = params[0]
326     inst = objects.Instance.FromDict(inst_s)
327     return backend.AddOSToInstance(inst)
328
329   @staticmethod
330   def perspective_instance_run_rename(params):
331     """Runs the OS rename script for an instance.
332
333     """
334     inst_s, old_name = params
335     inst = objects.Instance.FromDict(inst_s)
336     return backend.RunRenameInstance(inst, old_name)
337
338   @staticmethod
339   def perspective_instance_os_import(params):
340     """Run the import function of an OS onto a given instance.
341
342     """
343     inst_s, src_node, src_images, cluster_name = params
344     inst = objects.Instance.FromDict(inst_s)
345     return backend.ImportOSIntoInstance(inst, src_node, src_images,
346                                         cluster_name)
347
348   @staticmethod
349   def perspective_instance_shutdown(params):
350     """Shutdown an instance.
351
352     """
353     instance = objects.Instance.FromDict(params[0])
354     return backend.ShutdownInstance(instance)
355
356   @staticmethod
357   def perspective_instance_start(params):
358     """Start an instance.
359
360     """
361     instance = objects.Instance.FromDict(params[0])
362     extra_args = params[1]
363     return backend.StartInstance(instance, extra_args)
364
365   @staticmethod
366   def perspective_instance_migrate(params):
367     """Migrates an instance.
368
369     """
370     instance, target, live = params
371     instance = objects.Instance.FromDict(instance)
372     return backend.MigrateInstance(instance, target, live)
373
374   @staticmethod
375   def perspective_instance_reboot(params):
376     """Reboot an instance.
377
378     """
379     instance = objects.Instance.FromDict(params[0])
380     reboot_type = params[1]
381     extra_args = params[2]
382     return backend.RebootInstance(instance, reboot_type, extra_args)
383
384   @staticmethod
385   def perspective_instance_info(params):
386     """Query instance information.
387
388     """
389     return backend.GetInstanceInfo(params[0], params[1])
390
391   @staticmethod
392   def perspective_all_instances_info(params):
393     """Query information about all instances.
394
395     """
396     return backend.GetAllInstancesInfo(params[0])
397
398   @staticmethod
399   def perspective_instance_list(params):
400     """Query the list of running instances.
401
402     """
403     return backend.GetInstanceList(params[0])
404
405   # node --------------------------
406
407   @staticmethod
408   def perspective_node_tcp_ping(params):
409     """Do a TcpPing on the remote node.
410
411     """
412     return utils.TcpPing(params[1], params[2], timeout=params[3],
413                          live_port_needed=params[4], source=params[0])
414
415   @staticmethod
416   def perspective_node_has_ip_address(params):
417     """Checks if a node has the given ip address.
418
419     """
420     return utils.OwnIpAddress(params[0])
421
422   @staticmethod
423   def perspective_node_info(params):
424     """Query node information.
425
426     """
427     vgname, hypervisor_type = params
428     return backend.GetNodeInfo(vgname, hypervisor_type)
429
430   @staticmethod
431   def perspective_node_add(params):
432     """Complete the registration of this node in the cluster.
433
434     """
435     return backend.AddNode(params[0], params[1], params[2],
436                            params[3], params[4], params[5])
437
438   @staticmethod
439   def perspective_node_verify(params):
440     """Run a verify sequence on this node.
441
442     """
443     return backend.VerifyNode(params[0], params[1])
444
445   @staticmethod
446   def perspective_node_start_master(params):
447     """Promote this node to master status.
448
449     """
450     return backend.StartMaster(params[0])
451
452   @staticmethod
453   def perspective_node_stop_master(params):
454     """Demote this node from master status.
455
456     """
457     return backend.StopMaster(params[0])
458
459   @staticmethod
460   def perspective_node_leave_cluster(params):
461     """Cleanup after leaving a cluster.
462
463     """
464     return backend.LeaveCluster()
465
466   @staticmethod
467   def perspective_node_volumes(params):
468     """Query the list of all logical volume groups.
469
470     """
471     return backend.NodeVolumes()
472
473   # cluster --------------------------
474
475   @staticmethod
476   def perspective_version(params):
477     """Query version information.
478
479     """
480     return constants.PROTOCOL_VERSION
481
482   @staticmethod
483   def perspective_upload_file(params):
484     """Upload a file.
485
486     Note that the backend implementation imposes strict rules on which
487     files are accepted.
488
489     """
490     return backend.UploadFile(*params)
491
492   @staticmethod
493   def perspective_master_info(params):
494     """Query master information.
495
496     """
497     return backend.GetMasterInfo()
498
499   # os -----------------------
500
501   @staticmethod
502   def perspective_os_diagnose(params):
503     """Query detailed information about existing OSes.
504
505     """
506     return [os.ToDict() for os in backend.DiagnoseOS()]
507
508   @staticmethod
509   def perspective_os_get(params):
510     """Query information about a given OS.
511
512     """
513     name = params[0]
514     try:
515       os_obj = backend.OSFromDisk(name)
516     except errors.InvalidOS, err:
517       os_obj = objects.OS.FromInvalidOS(err)
518     return os_obj.ToDict()
519
520   # hooks -----------------------
521
522   @staticmethod
523   def perspective_hooks_runner(params):
524     """Run hook scripts.
525
526     """
527     hpath, phase, env = params
528     hr = backend.HooksRunner()
529     return hr.RunHooks(hpath, phase, env)
530
531   # iallocator -----------------
532
533   @staticmethod
534   def perspective_iallocator_runner(params):
535     """Run an iallocator script.
536
537     """
538     name, idata = params
539     iar = backend.IAllocatorRunner()
540     return iar.Run(name, idata)
541
542   # test -----------------------
543
544   @staticmethod
545   def perspective_test_delay(params):
546     """Run test delay.
547
548     """
549     duration = params[0]
550     return utils.TestDelay(duration)
551
552   # file storage ---------------
553
554   @staticmethod
555   def perspective_file_storage_dir_create(params):
556     """Create the file storage directory.
557
558     """
559     file_storage_dir = params[0]
560     return backend.CreateFileStorageDir(file_storage_dir)
561
562   @staticmethod
563   def perspective_file_storage_dir_remove(params):
564     """Remove the file storage directory.
565
566     """
567     file_storage_dir = params[0]
568     return backend.RemoveFileStorageDir(file_storage_dir)
569
570   @staticmethod
571   def perspective_file_storage_dir_rename(params):
572     """Rename the file storage directory.
573
574     """
575     old_file_storage_dir = params[0]
576     new_file_storage_dir = params[1]
577     return backend.RenameFileStorageDir(old_file_storage_dir,
578                                         new_file_storage_dir)
579
580   # jobs ------------------------
581
582   @staticmethod
583   @_RequireJobQueueLock
584   def perspective_jobqueue_update(params):
585     """Update job queue.
586
587     """
588     (file_name, content) = params
589     return backend.JobQueueUpdate(file_name, content)
590
591   @staticmethod
592   @_RequireJobQueueLock
593   def perspective_jobqueue_purge(params):
594     """Purge job queue.
595
596     """
597     return backend.JobQueuePurge()
598
599   @staticmethod
600   @_RequireJobQueueLock
601   def perspective_jobqueue_rename(params):
602     """Rename a job queue file.
603
604     """
605     (old, new) = params
606
607     return backend.JobQueueRename(old, new)
608
609   @staticmethod
610   def perspective_jobqueue_set_drain(params):
611     """Set/unset the queue drain flag.
612
613     """
614     drain_flag = params[0]
615     return backend.JobQueueSetDrainFlag(drain_flag)
616
617
618   # hypervisor ---------------
619
620   @staticmethod
621   def perspective_hypervisor_validate_params(params):
622     """Validate the hypervisor parameters.
623
624     """
625     (hvname, hvparams) = params
626     return backend.ValidateHVParams(hvname, hvparams)
627
628
629 def ParseOptions():
630   """Parse the command line options.
631
632   Returns:
633     (options, args) as from OptionParser.parse_args()
634
635   """
636   parser = OptionParser(description="Ganeti node daemon",
637                         usage="%prog [-f] [-d]",
638                         version="%%prog (ganeti) %s" %
639                         constants.RELEASE_VERSION)
640
641   parser.add_option("-f", "--foreground", dest="fork",
642                     help="Don't detach from the current terminal",
643                     default=True, action="store_false")
644   parser.add_option("-d", "--debug", dest="debug",
645                     help="Enable some debug messages",
646                     default=False, action="store_true")
647   options, args = parser.parse_args()
648   return options, args
649
650
651 def EnsureRuntimeEnvironment():
652   """Ensure our run-time environment is complete.
653
654   Currently this creates directories which could be missing, either
655   due to directories being on a tmpfs mount, or due to incomplete
656   packaging.
657
658   """
659   dirs = [(val, 0755) for val in constants.SUB_RUN_DIRS]
660   dirs.append((constants.LOG_OS_DIR, 0750))
661   for dir_name, dir_mode in dirs:
662     if not os.path.exists(dir_name):
663       try:
664         os.mkdir(dir_name, dir_mode)
665       except EnvironmentError, err:
666         if err.errno != errno.EEXIST:
667           print ("Node setup wrong, cannot create directory '%s': %s" %
668                  (dir_name, err))
669           sys.exit(5)
670     if not os.path.isdir(dir_name):
671       print ("Node setup wrong, '%s' is not a directory" % dir_name)
672       sys.exit(5)
673
674
675 def main():
676   """Main function for the node daemon.
677
678   """
679   global queue_lock
680
681   options, args = ParseOptions()
682   utils.debug = options.debug
683   for fname in (constants.SSL_CERT_FILE,):
684     if not os.path.isfile(fname):
685       print "config %s not there, will not run." % fname
686       sys.exit(5)
687
688   try:
689     port = utils.GetNodeDaemonPort()
690     pwdata = utils.GetNodeDaemonPassword()
691   except errors.ConfigurationError, err:
692     print "Cluster configuration incomplete: '%s'" % str(err)
693     sys.exit(5)
694
695   EnsureRuntimeEnvironment()
696
697   # become a daemon
698   if options.fork:
699     utils.Daemonize(logfile=constants.LOG_NODESERVER)
700
701   utils.WritePidFile(constants.NODED_PID)
702   try:
703     utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
704                        stderr_logging=not options.fork)
705     logging.info("ganeti node daemon startup")
706
707     # Prepare job queue
708     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
709
710     mainloop = daemon.Mainloop()
711     server = NodeHttpServer(mainloop, "", port)
712     server.Start()
713     try:
714       mainloop.Run()
715     finally:
716       server.Stop()
717   finally:
718     utils.RemovePidFile(constants.NODED_PID)
719
720
721 if __name__ == '__main__':
722   main()