backend.py change to get cluster name from master
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import SocketServer
31 import errno
32 import logging
33 import signal
34
35 from optparse import OptionParser
36
37 from ganeti import backend
38 from ganeti import logger
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import errors
42 from ganeti import jstore
43 from ganeti import http
44 from ganeti import utils
45
46
47 queue_lock = None
48
49
50 def _RequireJobQueueLock(fn):
51   """Decorator for job queue manipulating functions.
52
53   """
54   QUEUE_LOCK_TIMEOUT = 10
55
56   def wrapper(*args, **kwargs):
57     # Locking in exclusive, blocking mode because there could be several
58     # children running at the same time. Waiting up to 10 seconds.
59     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
60     try:
61       return fn(*args, **kwargs)
62     finally:
63       queue_lock.Unlock()
64
65   return wrapper
66
67
68 class NodeDaemonRequestHandler(http.HTTPRequestHandler):
69   """The server implementation.
70
71   This class holds all methods exposed over the RPC interface.
72
73   """
74   def HandleRequest(self):
75     """Handle a request.
76
77     """
78     if self.command.upper() != "PUT":
79       raise http.HTTPBadRequest()
80
81     path = self.path
82     if path.startswith("/"):
83       path = path[1:]
84
85     method = getattr(self, "perspective_%s" % path, None)
86     if method is None:
87       raise httperror.HTTPNotFound()
88
89     try:
90       try:
91         return method(self.post_data)
92       except:
93         logging.exception("Error in RPC call")
94         raise
95     except errors.QuitGanetiException, err:
96       # Tell parent to quit
97       os.kill(self.server.noded_pid, signal.SIGTERM)
98
99   # the new block devices  --------------------------
100
101   @staticmethod
102   def perspective_blockdev_create(params):
103     """Create a block device.
104
105     """
106     bdev_s, size, owner, on_primary, info = params
107     bdev = objects.Disk.FromDict(bdev_s)
108     if bdev is None:
109       raise ValueError("can't unserialize data!")
110     return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
111
112   @staticmethod
113   def perspective_blockdev_remove(params):
114     """Remove a block device.
115
116     """
117     bdev_s = params[0]
118     bdev = objects.Disk.FromDict(bdev_s)
119     return backend.RemoveBlockDevice(bdev)
120
121   @staticmethod
122   def perspective_blockdev_rename(params):
123     """Remove a block device.
124
125     """
126     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
127     return backend.RenameBlockDevices(devlist)
128
129   @staticmethod
130   def perspective_blockdev_assemble(params):
131     """Assemble a block device.
132
133     """
134     bdev_s, owner, on_primary = params
135     bdev = objects.Disk.FromDict(bdev_s)
136     if bdev is None:
137       raise ValueError("can't unserialize data!")
138     return backend.AssembleBlockDevice(bdev, owner, on_primary)
139
140   @staticmethod
141   def perspective_blockdev_shutdown(params):
142     """Shutdown a block device.
143
144     """
145     bdev_s = params[0]
146     bdev = objects.Disk.FromDict(bdev_s)
147     if bdev is None:
148       raise ValueError("can't unserialize data!")
149     return backend.ShutdownBlockDevice(bdev)
150
151   @staticmethod
152   def perspective_blockdev_addchildren(params):
153     """Add a child to a mirror device.
154
155     Note: this is only valid for mirror devices. It's the caller's duty
156     to send a correct disk, otherwise we raise an error.
157
158     """
159     bdev_s, ndev_s = params
160     bdev = objects.Disk.FromDict(bdev_s)
161     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
162     if bdev is None or ndevs.count(None) > 0:
163       raise ValueError("can't unserialize data!")
164     return backend.MirrorAddChildren(bdev, ndevs)
165
166   @staticmethod
167   def perspective_blockdev_removechildren(params):
168     """Remove a child from a mirror device.
169
170     This is only valid for mirror devices, of course. It's the callers
171     duty to send a correct disk, otherwise we raise an error.
172
173     """
174     bdev_s, ndev_s = params
175     bdev = objects.Disk.FromDict(bdev_s)
176     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
177     if bdev is None or ndevs.count(None) > 0:
178       raise ValueError("can't unserialize data!")
179     return backend.MirrorRemoveChildren(bdev, ndevs)
180
181   @staticmethod
182   def perspective_blockdev_getmirrorstatus(params):
183     """Return the mirror status for a list of disks.
184
185     """
186     disks = [objects.Disk.FromDict(dsk_s)
187             for dsk_s in params]
188     return backend.GetMirrorStatus(disks)
189
190   @staticmethod
191   def perspective_blockdev_find(params):
192     """Expose the FindBlockDevice functionality for a disk.
193
194     This will try to find but not activate a disk.
195
196     """
197     disk = objects.Disk.FromDict(params[0])
198     return backend.FindBlockDevice(disk)
199
200   @staticmethod
201   def perspective_blockdev_snapshot(params):
202     """Create a snapshot device.
203
204     Note that this is only valid for LVM disks, if we get passed
205     something else we raise an exception. The snapshot device can be
206     remove by calling the generic block device remove call.
207
208     """
209     cfbd = objects.Disk.FromDict(params[0])
210     return backend.SnapshotBlockDevice(cfbd)
211
212   @staticmethod
213   def perspective_blockdev_grow(params):
214     """Grow a stack of devices.
215
216     """
217     cfbd = objects.Disk.FromDict(params[0])
218     amount = params[1]
219     return backend.GrowBlockDevice(cfbd, amount)
220
221   @staticmethod
222   def perspective_blockdev_close(params):
223     """Closes the given block devices.
224
225     """
226     disks = [objects.Disk.FromDict(cf) for cf in params]
227     return backend.CloseBlockDevices(disks)
228
229   # export/import  --------------------------
230
231   @staticmethod
232   def perspective_snapshot_export(params):
233     """Export a given snapshot.
234
235     """
236     disk = objects.Disk.FromDict(params[0])
237     dest_node = params[1]
238     instance = objects.Instance.FromDict(params[2])
239     cluster_name = params[3]
240     return backend.ExportSnapshot(disk, dest_node, instance, cluster_name)
241
242   @staticmethod
243   def perspective_finalize_export(params):
244     """Expose the finalize export functionality.
245
246     """
247     instance = objects.Instance.FromDict(params[0])
248     snap_disks = [objects.Disk.FromDict(str_data)
249                   for str_data in params[1]]
250     return backend.FinalizeExport(instance, snap_disks)
251
252   @staticmethod
253   def perspective_export_info(params):
254     """Query information about an existing export on this node.
255
256     The given path may not contain an export, in which case we return
257     None.
258
259     """
260     path = params[0]
261     einfo = backend.ExportInfo(path)
262     if einfo is None:
263       return einfo
264     return einfo.Dumps()
265
266   @staticmethod
267   def perspective_export_list(params):
268     """List the available exports on this node.
269
270     Note that as opposed to export_info, which may query data about an
271     export in any path, this only queries the standard Ganeti path
272     (constants.EXPORT_DIR).
273
274     """
275     return backend.ListExports()
276
277   @staticmethod
278   def perspective_export_remove(params):
279     """Remove an export.
280
281     """
282     export = params[0]
283     return backend.RemoveExport(export)
284
285   # volume  --------------------------
286
287   @staticmethod
288   def perspective_volume_list(params):
289     """Query the list of logical volumes in a given volume group.
290
291     """
292     vgname = params[0]
293     return backend.GetVolumeList(vgname)
294
295   @staticmethod
296   def perspective_vg_list(params):
297     """Query the list of volume groups.
298
299     """
300     return backend.ListVolumeGroups()
301
302   # bridge  --------------------------
303
304   @staticmethod
305   def perspective_bridges_exist(params):
306     """Check if all bridges given exist on this node.
307
308     """
309     bridges_list = params[0]
310     return backend.BridgesExist(bridges_list)
311
312   # instance  --------------------------
313
314   @staticmethod
315   def perspective_instance_os_add(params):
316     """Install an OS on a given instance.
317
318     """
319     inst_s, os_disk, swap_disk = params
320     inst = objects.Instance.FromDict(inst_s)
321     return backend.AddOSToInstance(inst, os_disk, swap_disk)
322
323   @staticmethod
324   def perspective_instance_run_rename(params):
325     """Runs the OS rename script for an instance.
326
327     """
328     inst_s, old_name, os_disk, swap_disk = params
329     inst = objects.Instance.FromDict(inst_s)
330     return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
331
332   @staticmethod
333   def perspective_instance_os_import(params):
334     """Run the import function of an OS onto a given instance.
335
336     """
337     inst_s, os_disk, swap_disk, src_node, src_image, cluster_name = params
338     inst = objects.Instance.FromDict(inst_s)
339     return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
340                                         src_node, src_image, cluster_name)
341
342   @staticmethod
343   def perspective_instance_shutdown(params):
344     """Shutdown an instance.
345
346     """
347     instance = objects.Instance.FromDict(params[0])
348     return backend.ShutdownInstance(instance)
349
350   @staticmethod
351   def perspective_instance_start(params):
352     """Start an instance.
353
354     """
355     instance = objects.Instance.FromDict(params[0])
356     extra_args = params[1]
357     return backend.StartInstance(instance, extra_args)
358
359   @staticmethod
360   def perspective_instance_migrate(params):
361     """Migrates an instance.
362
363     """
364     instance, target, live = params
365     return backend.MigrateInstance(instance, target, live)
366
367   @staticmethod
368   def perspective_instance_reboot(params):
369     """Reboot an instance.
370
371     """
372     instance = objects.Instance.FromDict(params[0])
373     reboot_type = params[1]
374     extra_args = params[2]
375     return backend.RebootInstance(instance, reboot_type, extra_args)
376
377   @staticmethod
378   def perspective_instance_info(params):
379     """Query instance information.
380
381     """
382     return backend.GetInstanceInfo(params[0])
383
384   @staticmethod
385   def perspective_all_instances_info(params):
386     """Query information about all instances.
387
388     """
389     return backend.GetAllInstancesInfo()
390
391   @staticmethod
392   def perspective_instance_list(params):
393     """Query the list of running instances.
394
395     """
396     return backend.GetInstanceList()
397
398   # node --------------------------
399
400   @staticmethod
401   def perspective_node_tcp_ping(params):
402     """Do a TcpPing on the remote node.
403
404     """
405     return utils.TcpPing(params[1], params[2], timeout=params[3],
406                          live_port_needed=params[4], source=params[0])
407
408   @staticmethod
409   def perspective_node_info(params):
410     """Query node information.
411
412     """
413     vgname = params[0]
414     return backend.GetNodeInfo(vgname)
415
416   @staticmethod
417   def perspective_node_add(params):
418     """Complete the registration of this node in the cluster.
419
420     """
421     return backend.AddNode(params[0], params[1], params[2],
422                            params[3], params[4], params[5])
423
424   @staticmethod
425   def perspective_node_verify(params):
426     """Run a verify sequence on this node.
427
428     """
429     return backend.VerifyNode(params[0], params[1])
430
431   @staticmethod
432   def perspective_node_start_master(params):
433     """Promote this node to master status.
434
435     """
436     return backend.StartMaster(params[0])
437
438   @staticmethod
439   def perspective_node_stop_master(params):
440     """Demote this node from master status.
441
442     """
443     return backend.StopMaster(params[0])
444
445   @staticmethod
446   def perspective_node_leave_cluster(params):
447     """Cleanup after leaving a cluster.
448
449     """
450     return backend.LeaveCluster()
451
452   @staticmethod
453   def perspective_node_volumes(params):
454     """Query the list of all logical volume groups.
455
456     """
457     return backend.NodeVolumes()
458
459   # cluster --------------------------
460
461   @staticmethod
462   def perspective_version(params):
463     """Query version information.
464
465     """
466     return constants.PROTOCOL_VERSION
467
468   @staticmethod
469   def perspective_upload_file(params):
470     """Upload a file.
471
472     Note that the backend implementation imposes strict rules on which
473     files are accepted.
474
475     """
476     return backend.UploadFile(*params)
477
478   @staticmethod
479   def perspective_master_info(params):
480     """Query master information.
481
482     """
483     return backend.GetMasterInfo()
484
485   # os -----------------------
486
487   @staticmethod
488   def perspective_os_diagnose(params):
489     """Query detailed information about existing OSes.
490
491     """
492     return [os.ToDict() for os in backend.DiagnoseOS()]
493
494   @staticmethod
495   def perspective_os_get(params):
496     """Query information about a given OS.
497
498     """
499     name = params[0]
500     try:
501       os_obj = backend.OSFromDisk(name)
502     except errors.InvalidOS, err:
503       os_obj = objects.OS.FromInvalidOS(err)
504     return os_obj.ToDict()
505
506   # hooks -----------------------
507
508   @staticmethod
509   def perspective_hooks_runner(params):
510     """Run hook scripts.
511
512     """
513     hpath, phase, env = params
514     hr = backend.HooksRunner()
515     return hr.RunHooks(hpath, phase, env)
516
517   # iallocator -----------------
518
519   @staticmethod
520   def perspective_iallocator_runner(params):
521     """Run an iallocator script.
522
523     """
524     name, idata = params
525     iar = backend.IAllocatorRunner()
526     return iar.Run(name, idata)
527
528   # test -----------------------
529
530   @staticmethod
531   def perspective_test_delay(params):
532     """Run test delay.
533
534     """
535     duration = params[0]
536     return utils.TestDelay(duration)
537
538   # file storage ---------------
539
540   @staticmethod
541   def perspective_file_storage_dir_create(params):
542     """Create the file storage directory.
543
544     """
545     file_storage_dir = params[0]
546     return backend.CreateFileStorageDir(file_storage_dir)
547
548   @staticmethod
549   def perspective_file_storage_dir_remove(params):
550     """Remove the file storage directory.
551
552     """
553     file_storage_dir = params[0]
554     return backend.RemoveFileStorageDir(file_storage_dir)
555
556   @staticmethod
557   def perspective_file_storage_dir_rename(params):
558     """Rename the file storage directory.
559
560     """
561     old_file_storage_dir = params[0]
562     new_file_storage_dir = params[1]
563     return backend.RenameFileStorageDir(old_file_storage_dir,
564                                         new_file_storage_dir)
565
566   # jobs ------------------------
567
568   @staticmethod
569   @_RequireJobQueueLock
570   def perspective_jobqueue_update(params):
571     """Update job queue.
572
573     """
574     (file_name, content) = params
575     return backend.JobQueueUpdate(file_name, content)
576
577   @staticmethod
578   @_RequireJobQueueLock
579   def perspective_jobqueue_purge(params):
580     """Purge job queue.
581
582     """
583     return backend.JobQueuePurge()
584
585   @staticmethod
586   @_RequireJobQueueLock
587   def perspective_jobqueue_rename(params):
588     """Rename a job queue file.
589
590     """
591     (old, new) = params
592
593     return backend.JobQueueRename(old, new)
594
595
596 class NodeDaemonHttpServer(http.HTTPServer):
597   def __init__(self, server_address):
598     http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler)
599     self.noded_pid = os.getpid()
600
601   def serve_forever(self):
602     """Handle requests until told to quit."""
603     sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
604     try:
605       while not sighandler.called:
606         self.handle_request()
607       # TODO: There could be children running at this point
608     finally:
609       sighandler.Reset()
610
611
612 class ForkingHTTPServer(SocketServer.ForkingMixIn, NodeDaemonHttpServer):
613   """Forking HTTP Server.
614
615   This inherits from ForkingMixIn and HTTPServer in order to fork for each
616   request we handle. This allows more requests to be handled concurrently.
617
618   """
619
620
621 def ParseOptions():
622   """Parse the command line options.
623
624   Returns:
625     (options, args) as from OptionParser.parse_args()
626
627   """
628   parser = OptionParser(description="Ganeti node daemon",
629                         usage="%prog [-f] [-d]",
630                         version="%%prog (ganeti) %s" %
631                         constants.RELEASE_VERSION)
632
633   parser.add_option("-f", "--foreground", dest="fork",
634                     help="Don't detach from the current terminal",
635                     default=True, action="store_false")
636   parser.add_option("-d", "--debug", dest="debug",
637                     help="Enable some debug messages",
638                     default=False, action="store_true")
639   options, args = parser.parse_args()
640   return options, args
641
642
643 def main():
644   """Main function for the node daemon.
645
646   """
647   global queue_lock
648
649   options, args = ParseOptions()
650   utils.debug = options.debug
651   for fname in (constants.SSL_CERT_FILE,):
652     if not os.path.isfile(fname):
653       print "config %s not there, will not run." % fname
654       sys.exit(5)
655
656   try:
657     port = utils.GetNodeDaemonPort()
658     pwdata = utils.GetNodeDaemonPassword()
659   except errors.ConfigurationError, err:
660     print "Cluster configuration incomplete: '%s'" % str(err)
661     sys.exit(5)
662
663   # create the various SUB_RUN_DIRS, if not existing, so that we handle the
664   # situation where RUN_DIR is tmpfs
665   for dir_name in constants.SUB_RUN_DIRS:
666     if not os.path.exists(dir_name):
667       try:
668         os.mkdir(dir_name, 0755)
669       except EnvironmentError, err:
670         if err.errno != errno.EEXIST:
671           print ("Node setup wrong, cannot create directory %s: %s" %
672                  (dir_name, err))
673           sys.exit(5)
674     if not os.path.isdir(dir_name):
675       print ("Node setup wrong, %s is not a directory" % dir_name)
676       sys.exit(5)
677
678   # become a daemon
679   if options.fork:
680     utils.Daemonize(logfile=constants.LOG_NODESERVER)
681
682   utils.WritePidFile(constants.NODED_PID)
683
684   logger.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
685                       stderr_logging=not options.fork)
686   logging.info("ganeti node daemon startup")
687
688   # Prepare job queue
689   queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
690
691   if options.fork:
692     server = ForkingHTTPServer(('', port))
693   else:
694     server = NodeDaemonHttpServer(('', port))
695
696   try:
697     server.serve_forever()
698   finally:
699     utils.RemovePidFile(constants.NODED_PID)
700
701
702 if __name__ == '__main__':
703   main()