Switch the instance_reboot rpc to (status, data)
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import SocketServer
31 import errno
32 import logging
33 import signal
34
35 from optparse import OptionParser
36
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
45
46 import ganeti.http.server
47
48
49 queue_lock = None
50
51
52 def _RequireJobQueueLock(fn):
53   """Decorator for job queue manipulating functions.
54
55   """
56   QUEUE_LOCK_TIMEOUT = 10
57
58   def wrapper(*args, **kwargs):
59     # Locking in exclusive, blocking mode because there could be several
60     # children running at the same time. Waiting up to 10 seconds.
61     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
62     try:
63       return fn(*args, **kwargs)
64     finally:
65       queue_lock.Unlock()
66
67   return wrapper
68
69
70 class NodeHttpServer(http.server.HttpServer):
71   """The server implementation.
72
73   This class holds all methods exposed over the RPC interface.
74
75   """
76   def __init__(self, *args, **kwargs):
77     http.server.HttpServer.__init__(self, *args, **kwargs)
78     self.noded_pid = os.getpid()
79
80   def HandleRequest(self, req):
81     """Handle a request.
82
83     """
84     if req.request_method.upper() != http.HTTP_PUT:
85       raise http.HttpBadRequest()
86
87     path = req.request_path
88     if path.startswith("/"):
89       path = path[1:]
90
91     method = getattr(self, "perspective_%s" % path, None)
92     if method is None:
93       raise http.HttpNotFound()
94
95     try:
96       try:
97         return method(req.request_body)
98       except:
99         logging.exception("Error in RPC call")
100         raise
101     except errors.QuitGanetiException, err:
102       # Tell parent to quit
103       os.kill(self.noded_pid, signal.SIGTERM)
104
105   # the new block devices  --------------------------
106
107   @staticmethod
108   def perspective_blockdev_create(params):
109     """Create a block device.
110
111     """
112     bdev_s, size, owner, on_primary, info = params
113     bdev = objects.Disk.FromDict(bdev_s)
114     if bdev is None:
115       raise ValueError("can't unserialize data!")
116     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
117
118   @staticmethod
119   def perspective_blockdev_remove(params):
120     """Remove a block device.
121
122     """
123     bdev_s = params[0]
124     bdev = objects.Disk.FromDict(bdev_s)
125     return backend.BlockdevRemove(bdev)
126
127   @staticmethod
128   def perspective_blockdev_rename(params):
129     """Remove a block device.
130
131     """
132     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
133     return backend.BlockdevRename(devlist)
134
135   @staticmethod
136   def perspective_blockdev_assemble(params):
137     """Assemble a block device.
138
139     """
140     bdev_s, owner, on_primary = params
141     bdev = objects.Disk.FromDict(bdev_s)
142     if bdev is None:
143       raise ValueError("can't unserialize data!")
144     return backend.BlockdevAssemble(bdev, owner, on_primary)
145
146   @staticmethod
147   def perspective_blockdev_shutdown(params):
148     """Shutdown a block device.
149
150     """
151     bdev_s = params[0]
152     bdev = objects.Disk.FromDict(bdev_s)
153     if bdev is None:
154       raise ValueError("can't unserialize data!")
155     return backend.BlockdevShutdown(bdev)
156
157   @staticmethod
158   def perspective_blockdev_addchildren(params):
159     """Add a child to a mirror device.
160
161     Note: this is only valid for mirror devices. It's the caller's duty
162     to send a correct disk, otherwise we raise an error.
163
164     """
165     bdev_s, ndev_s = params
166     bdev = objects.Disk.FromDict(bdev_s)
167     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
168     if bdev is None or ndevs.count(None) > 0:
169       raise ValueError("can't unserialize data!")
170     return backend.BlockdevAddchildren(bdev, ndevs)
171
172   @staticmethod
173   def perspective_blockdev_removechildren(params):
174     """Remove a child from a mirror device.
175
176     This is only valid for mirror devices, of course. It's the callers
177     duty to send a correct disk, otherwise we raise an error.
178
179     """
180     bdev_s, ndev_s = params
181     bdev = objects.Disk.FromDict(bdev_s)
182     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
183     if bdev is None or ndevs.count(None) > 0:
184       raise ValueError("can't unserialize data!")
185     return backend.BlockdevRemovechildren(bdev, ndevs)
186
187   @staticmethod
188   def perspective_blockdev_getmirrorstatus(params):
189     """Return the mirror status for a list of disks.
190
191     """
192     disks = [objects.Disk.FromDict(dsk_s)
193             for dsk_s in params]
194     return backend.BlockdevGetmirrorstatus(disks)
195
196   @staticmethod
197   def perspective_blockdev_find(params):
198     """Expose the FindBlockDevice functionality for a disk.
199
200     This will try to find but not activate a disk.
201
202     """
203     disk = objects.Disk.FromDict(params[0])
204     return backend.BlockdevFind(disk)
205
206   @staticmethod
207   def perspective_blockdev_snapshot(params):
208     """Create a snapshot device.
209
210     Note that this is only valid for LVM disks, if we get passed
211     something else we raise an exception. The snapshot device can be
212     remove by calling the generic block device remove call.
213
214     """
215     cfbd = objects.Disk.FromDict(params[0])
216     return backend.BlockdevSnapshot(cfbd)
217
218   @staticmethod
219   def perspective_blockdev_grow(params):
220     """Grow a stack of devices.
221
222     """
223     cfbd = objects.Disk.FromDict(params[0])
224     amount = params[1]
225     return backend.BlockdevGrow(cfbd, amount)
226
227   @staticmethod
228   def perspective_blockdev_close(params):
229     """Closes the given block devices.
230
231     """
232     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
233     return backend.BlockdevClose(params[0], disks)
234
235   # blockdev/drbd specific methods ----------
236
237   @staticmethod
238   def perspective_drbd_disconnect_net(params):
239     """Disconnects the network connection of drbd disks.
240
241     Note that this is only valid for drbd disks, so the members of the
242     disk list must all be drbd devices.
243
244     """
245     nodes_ip, disks = params
246     disks = [objects.Disk.FromDict(cf) for cf in disks]
247     return backend.DrbdDisconnectNet(nodes_ip, disks)
248
249   @staticmethod
250   def perspective_drbd_attach_net(params):
251     """Attaches the network connection of drbd disks.
252
253     Note that this is only valid for drbd disks, so the members of the
254     disk list must all be drbd devices.
255
256     """
257     nodes_ip, disks, instance_name, multimaster = params
258     disks = [objects.Disk.FromDict(cf) for cf in disks]
259     return backend.DrbdAttachNet(nodes_ip, disks,
260                                      instance_name, multimaster)
261
262   @staticmethod
263   def perspective_drbd_wait_sync(params):
264     """Wait until DRBD disks are synched.
265
266     Note that this is only valid for drbd disks, so the members of the
267     disk list must all be drbd devices.
268
269     """
270     nodes_ip, disks = params
271     disks = [objects.Disk.FromDict(cf) for cf in disks]
272     return backend.DrbdWaitSync(nodes_ip, disks)
273
274   # export/import  --------------------------
275
276   @staticmethod
277   def perspective_snapshot_export(params):
278     """Export a given snapshot.
279
280     """
281     disk = objects.Disk.FromDict(params[0])
282     dest_node = params[1]
283     instance = objects.Instance.FromDict(params[2])
284     cluster_name = params[3]
285     dev_idx = params[4]
286     return backend.ExportSnapshot(disk, dest_node, instance,
287                                   cluster_name, dev_idx)
288
289   @staticmethod
290   def perspective_finalize_export(params):
291     """Expose the finalize export functionality.
292
293     """
294     instance = objects.Instance.FromDict(params[0])
295     snap_disks = [objects.Disk.FromDict(str_data)
296                   for str_data in params[1]]
297     return backend.FinalizeExport(instance, snap_disks)
298
299   @staticmethod
300   def perspective_export_info(params):
301     """Query information about an existing export on this node.
302
303     The given path may not contain an export, in which case we return
304     None.
305
306     """
307     path = params[0]
308     einfo = backend.ExportInfo(path)
309     if einfo is None:
310       return einfo
311     return einfo.Dumps()
312
313   @staticmethod
314   def perspective_export_list(params):
315     """List the available exports on this node.
316
317     Note that as opposed to export_info, which may query data about an
318     export in any path, this only queries the standard Ganeti path
319     (constants.EXPORT_DIR).
320
321     """
322     return backend.ListExports()
323
324   @staticmethod
325   def perspective_export_remove(params):
326     """Remove an export.
327
328     """
329     export = params[0]
330     return backend.RemoveExport(export)
331
332   # volume  --------------------------
333
334   @staticmethod
335   def perspective_volume_list(params):
336     """Query the list of logical volumes in a given volume group.
337
338     """
339     vgname = params[0]
340     return backend.GetVolumeList(vgname)
341
342   @staticmethod
343   def perspective_vg_list(params):
344     """Query the list of volume groups.
345
346     """
347     return backend.ListVolumeGroups()
348
349   # bridge  --------------------------
350
351   @staticmethod
352   def perspective_bridges_exist(params):
353     """Check if all bridges given exist on this node.
354
355     """
356     bridges_list = params[0]
357     return backend.BridgesExist(bridges_list)
358
359   # instance  --------------------------
360
361   @staticmethod
362   def perspective_instance_os_add(params):
363     """Install an OS on a given instance.
364
365     """
366     inst_s = params[0]
367     inst = objects.Instance.FromDict(inst_s)
368     return backend.InstanceOsAdd(inst)
369
370   @staticmethod
371   def perspective_instance_run_rename(params):
372     """Runs the OS rename script for an instance.
373
374     """
375     inst_s, old_name = params
376     inst = objects.Instance.FromDict(inst_s)
377     return backend.RunRenameInstance(inst, old_name)
378
379   @staticmethod
380   def perspective_instance_os_import(params):
381     """Run the import function of an OS onto a given instance.
382
383     """
384     inst_s, src_node, src_images, cluster_name = params
385     inst = objects.Instance.FromDict(inst_s)
386     return backend.ImportOSIntoInstance(inst, src_node, src_images,
387                                         cluster_name)
388
389   @staticmethod
390   def perspective_instance_shutdown(params):
391     """Shutdown an instance.
392
393     """
394     instance = objects.Instance.FromDict(params[0])
395     return backend.ShutdownInstance(instance)
396
397   @staticmethod
398   def perspective_instance_start(params):
399     """Start an instance.
400
401     """
402     instance = objects.Instance.FromDict(params[0])
403     extra_args = params[1]
404     return backend.StartInstance(instance, extra_args)
405
406   @staticmethod
407   def perspective_migration_info(params):
408     """Gather information about an instance to be migrated.
409
410     """
411     instance = objects.Instance.FromDict(params[0])
412     return backend.MigrationInfo(instance)
413
414   @staticmethod
415   def perspective_accept_instance(params):
416     """Prepare the node to accept an instance.
417
418     """
419     instance, info, target = params
420     instance = objects.Instance.FromDict(instance)
421     return backend.AcceptInstance(instance, info, target)
422
423   @staticmethod
424   def perspective_finalize_migration(params):
425     """Finalize the instance migration.
426
427     """
428     instance, info, success = params
429     instance = objects.Instance.FromDict(instance)
430     return backend.FinalizeMigration(instance, info, success)
431
432   @staticmethod
433   def perspective_instance_migrate(params):
434     """Migrates an instance.
435
436     """
437     instance, target, live = params
438     instance = objects.Instance.FromDict(instance)
439     return backend.MigrateInstance(instance, target, live)
440
441   @staticmethod
442   def perspective_instance_reboot(params):
443     """Reboot an instance.
444
445     """
446     instance = objects.Instance.FromDict(params[0])
447     reboot_type = params[1]
448     extra_args = params[2]
449     return backend.InstanceReboot(instance, reboot_type, extra_args)
450
451   @staticmethod
452   def perspective_instance_info(params):
453     """Query instance information.
454
455     """
456     return backend.GetInstanceInfo(params[0], params[1])
457
458   @staticmethod
459   def perspective_instance_migratable(params):
460     """Query whether the specified instance can be migrated.
461
462     """
463     instance = objects.Instance.FromDict(params[0])
464     return backend.GetInstanceMigratable(instance)
465
466   @staticmethod
467   def perspective_all_instances_info(params):
468     """Query information about all instances.
469
470     """
471     return backend.GetAllInstancesInfo(params[0])
472
473   @staticmethod
474   def perspective_instance_list(params):
475     """Query the list of running instances.
476
477     """
478     return backend.GetInstanceList(params[0])
479
480   # node --------------------------
481
482   @staticmethod
483   def perspective_node_tcp_ping(params):
484     """Do a TcpPing on the remote node.
485
486     """
487     return utils.TcpPing(params[1], params[2], timeout=params[3],
488                          live_port_needed=params[4], source=params[0])
489
490   @staticmethod
491   def perspective_node_has_ip_address(params):
492     """Checks if a node has the given ip address.
493
494     """
495     return utils.OwnIpAddress(params[0])
496
497   @staticmethod
498   def perspective_node_info(params):
499     """Query node information.
500
501     """
502     vgname, hypervisor_type = params
503     return backend.GetNodeInfo(vgname, hypervisor_type)
504
505   @staticmethod
506   def perspective_node_add(params):
507     """Complete the registration of this node in the cluster.
508
509     """
510     return backend.AddNode(params[0], params[1], params[2],
511                            params[3], params[4], params[5])
512
513   @staticmethod
514   def perspective_node_verify(params):
515     """Run a verify sequence on this node.
516
517     """
518     return backend.VerifyNode(params[0], params[1])
519
520   @staticmethod
521   def perspective_node_start_master(params):
522     """Promote this node to master status.
523
524     """
525     return backend.StartMaster(params[0])
526
527   @staticmethod
528   def perspective_node_stop_master(params):
529     """Demote this node from master status.
530
531     """
532     return backend.StopMaster(params[0])
533
534   @staticmethod
535   def perspective_node_leave_cluster(params):
536     """Cleanup after leaving a cluster.
537
538     """
539     return backend.LeaveCluster()
540
541   @staticmethod
542   def perspective_node_volumes(params):
543     """Query the list of all logical volume groups.
544
545     """
546     return backend.NodeVolumes()
547
548   @staticmethod
549   def perspective_node_demote_from_mc(params):
550     """Demote a node from the master candidate role.
551
552     """
553     return backend.DemoteFromMC()
554
555
556   # cluster --------------------------
557
558   @staticmethod
559   def perspective_version(params):
560     """Query version information.
561
562     """
563     return constants.PROTOCOL_VERSION
564
565   @staticmethod
566   def perspective_upload_file(params):
567     """Upload a file.
568
569     Note that the backend implementation imposes strict rules on which
570     files are accepted.
571
572     """
573     return backend.UploadFile(*params)
574
575   @staticmethod
576   def perspective_master_info(params):
577     """Query master information.
578
579     """
580     return backend.GetMasterInfo()
581
582   @staticmethod
583   def perspective_write_ssconf_files(params):
584     """Write ssconf files.
585
586     """
587     (values,) = params
588     return backend.WriteSsconfFiles(values)
589
590   # os -----------------------
591
592   @staticmethod
593   def perspective_os_diagnose(params):
594     """Query detailed information about existing OSes.
595
596     """
597     return [os_obj.ToDict() for os_obj in backend.DiagnoseOS()]
598
599   @staticmethod
600   def perspective_os_get(params):
601     """Query information about a given OS.
602
603     """
604     name = params[0]
605     try:
606       os_obj = backend.OSFromDisk(name)
607     except errors.InvalidOS, err:
608       os_obj = objects.OS.FromInvalidOS(err)
609     return os_obj.ToDict()
610
611   # hooks -----------------------
612
613   @staticmethod
614   def perspective_hooks_runner(params):
615     """Run hook scripts.
616
617     """
618     hpath, phase, env = params
619     hr = backend.HooksRunner()
620     return hr.RunHooks(hpath, phase, env)
621
622   # iallocator -----------------
623
624   @staticmethod
625   def perspective_iallocator_runner(params):
626     """Run an iallocator script.
627
628     """
629     name, idata = params
630     iar = backend.IAllocatorRunner()
631     return iar.Run(name, idata)
632
633   # test -----------------------
634
635   @staticmethod
636   def perspective_test_delay(params):
637     """Run test delay.
638
639     """
640     duration = params[0]
641     return utils.TestDelay(duration)
642
643   # file storage ---------------
644
645   @staticmethod
646   def perspective_file_storage_dir_create(params):
647     """Create the file storage directory.
648
649     """
650     file_storage_dir = params[0]
651     return backend.CreateFileStorageDir(file_storage_dir)
652
653   @staticmethod
654   def perspective_file_storage_dir_remove(params):
655     """Remove the file storage directory.
656
657     """
658     file_storage_dir = params[0]
659     return backend.RemoveFileStorageDir(file_storage_dir)
660
661   @staticmethod
662   def perspective_file_storage_dir_rename(params):
663     """Rename the file storage directory.
664
665     """
666     old_file_storage_dir = params[0]
667     new_file_storage_dir = params[1]
668     return backend.RenameFileStorageDir(old_file_storage_dir,
669                                         new_file_storage_dir)
670
671   # jobs ------------------------
672
673   @staticmethod
674   @_RequireJobQueueLock
675   def perspective_jobqueue_update(params):
676     """Update job queue.
677
678     """
679     (file_name, content) = params
680     return backend.JobQueueUpdate(file_name, content)
681
682   @staticmethod
683   @_RequireJobQueueLock
684   def perspective_jobqueue_purge(params):
685     """Purge job queue.
686
687     """
688     return backend.JobQueuePurge()
689
690   @staticmethod
691   @_RequireJobQueueLock
692   def perspective_jobqueue_rename(params):
693     """Rename a job queue file.
694
695     """
696     # TODO: What if a file fails to rename?
697     return [backend.JobQueueRename(old, new) for old, new in params]
698
699   @staticmethod
700   def perspective_jobqueue_set_drain(params):
701     """Set/unset the queue drain flag.
702
703     """
704     drain_flag = params[0]
705     return backend.JobQueueSetDrainFlag(drain_flag)
706
707
708   # hypervisor ---------------
709
710   @staticmethod
711   def perspective_hypervisor_validate_params(params):
712     """Validate the hypervisor parameters.
713
714     """
715     (hvname, hvparams) = params
716     return backend.ValidateHVParams(hvname, hvparams)
717
718
719 def ParseOptions():
720   """Parse the command line options.
721
722   @return: (options, args) as from OptionParser.parse_args()
723
724   """
725   parser = OptionParser(description="Ganeti node daemon",
726                         usage="%prog [-f] [-d]",
727                         version="%%prog (ganeti) %s" %
728                         constants.RELEASE_VERSION)
729
730   parser.add_option("-f", "--foreground", dest="fork",
731                     help="Don't detach from the current terminal",
732                     default=True, action="store_false")
733   parser.add_option("-d", "--debug", dest="debug",
734                     help="Enable some debug messages",
735                     default=False, action="store_true")
736   options, args = parser.parse_args()
737   return options, args
738
739
740 def EnsureRuntimeEnvironment():
741   """Ensure our run-time environment is complete.
742
743   Currently this creates directories which could be missing, either
744   due to directories being on a tmpfs mount, or due to incomplete
745   packaging.
746
747   """
748   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
749   dirs.append((constants.LOG_OS_DIR, 0750))
750   dirs.append((constants.LOCK_DIR, 1777))
751   for dir_name, dir_mode in dirs:
752     if not os.path.exists(dir_name):
753       try:
754         os.mkdir(dir_name, dir_mode)
755       except EnvironmentError, err:
756         if err.errno != errno.EEXIST:
757           print ("Node setup wrong, cannot create directory '%s': %s" %
758                  (dir_name, err))
759           sys.exit(5)
760     if not os.path.isdir(dir_name):
761       print ("Node setup wrong, '%s' is not a directory" % dir_name)
762       sys.exit(5)
763
764
765 def main():
766   """Main function for the node daemon.
767
768   """
769   global queue_lock
770
771   options, args = ParseOptions()
772   utils.debug = options.debug
773
774   if options.fork:
775     utils.CloseFDs()
776
777   for fname in (constants.SSL_CERT_FILE,):
778     if not os.path.isfile(fname):
779       print "config %s not there, will not run." % fname
780       sys.exit(5)
781
782   try:
783     port = utils.GetNodeDaemonPort()
784   except errors.ConfigurationError, err:
785     print "Cluster configuration incomplete: '%s'" % str(err)
786     sys.exit(5)
787
788   EnsureRuntimeEnvironment()
789
790   # become a daemon
791   if options.fork:
792     utils.Daemonize(logfile=constants.LOG_NODESERVER)
793
794   utils.WritePidFile(constants.NODED_PID)
795   try:
796     utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
797                        stderr_logging=not options.fork)
798     logging.info("ganeti node daemon startup")
799
800     # Read SSL certificate
801     ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
802                                     ssl_cert_path=constants.SSL_CERT_FILE)
803
804     # Prepare job queue
805     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
806
807     mainloop = daemon.Mainloop()
808     server = NodeHttpServer(mainloop, "", port,
809                             ssl_params=ssl_params, ssl_verify_peer=True)
810     server.Start()
811     try:
812       mainloop.Run()
813     finally:
814       server.Stop()
815   finally:
816     utils.RemovePidFile(constants.NODED_PID)
817
818
819 if __name__ == '__main__':
820   main()