Merge branch 'master' into next
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import SocketServer
31 import errno
32 import logging
33 import signal
34
35 from optparse import OptionParser
36
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
45
46 import ganeti.http.server
47
48
49 queue_lock = None
50
51
52 def _RequireJobQueueLock(fn):
53   """Decorator for job queue manipulating functions.
54
55   """
56   QUEUE_LOCK_TIMEOUT = 10
57
58   def wrapper(*args, **kwargs):
59     # Locking in exclusive, blocking mode because there could be several
60     # children running at the same time. Waiting up to 10 seconds.
61     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
62     try:
63       return fn(*args, **kwargs)
64     finally:
65       queue_lock.Unlock()
66
67   return wrapper
68
69
70 class NodeHttpServer(http.server.HttpServer):
71   """The server implementation.
72
73   This class holds all methods exposed over the RPC interface.
74
75   """
76   def __init__(self, *args, **kwargs):
77     http.server.HttpServer.__init__(self, *args, **kwargs)
78     self.noded_pid = os.getpid()
79
80   def HandleRequest(self, req):
81     """Handle a request.
82
83     """
84     if req.request_method.upper() != http.HTTP_PUT:
85       raise http.HttpBadRequest()
86
87     path = req.request_path
88     if path.startswith("/"):
89       path = path[1:]
90
91     method = getattr(self, "perspective_%s" % path, None)
92     if method is None:
93       raise http.HttpNotFound()
94
95     try:
96       try:
97         return method(req.request_body)
98       except:
99         logging.exception("Error in RPC call")
100         raise
101     except errors.QuitGanetiException, err:
102       # Tell parent to quit
103       os.kill(self.noded_pid, signal.SIGTERM)
104
105   # the new block devices  --------------------------
106
107   @staticmethod
108   def perspective_blockdev_create(params):
109     """Create a block device.
110
111     """
112     bdev_s, size, owner, on_primary, info = params
113     bdev = objects.Disk.FromDict(bdev_s)
114     if bdev is None:
115       raise ValueError("can't unserialize data!")
116     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
117
118   @staticmethod
119   def perspective_blockdev_remove(params):
120     """Remove a block device.
121
122     """
123     bdev_s = params[0]
124     bdev = objects.Disk.FromDict(bdev_s)
125     return backend.BlockdevRemove(bdev)
126
127   @staticmethod
128   def perspective_blockdev_rename(params):
129     """Remove a block device.
130
131     """
132     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
133     return backend.BlockdevRename(devlist)
134
135   @staticmethod
136   def perspective_blockdev_assemble(params):
137     """Assemble a block device.
138
139     """
140     bdev_s, owner, on_primary = params
141     bdev = objects.Disk.FromDict(bdev_s)
142     if bdev is None:
143       raise ValueError("can't unserialize data!")
144     return backend.BlockdevAssemble(bdev, owner, on_primary)
145
146   @staticmethod
147   def perspective_blockdev_shutdown(params):
148     """Shutdown a block device.
149
150     """
151     bdev_s = params[0]
152     bdev = objects.Disk.FromDict(bdev_s)
153     if bdev is None:
154       raise ValueError("can't unserialize data!")
155     return backend.BlockdevShutdown(bdev)
156
157   @staticmethod
158   def perspective_blockdev_addchildren(params):
159     """Add a child to a mirror device.
160
161     Note: this is only valid for mirror devices. It's the caller's duty
162     to send a correct disk, otherwise we raise an error.
163
164     """
165     bdev_s, ndev_s = params
166     bdev = objects.Disk.FromDict(bdev_s)
167     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
168     if bdev is None or ndevs.count(None) > 0:
169       raise ValueError("can't unserialize data!")
170     return backend.BlockdevAddchildren(bdev, ndevs)
171
172   @staticmethod
173   def perspective_blockdev_removechildren(params):
174     """Remove a child from a mirror device.
175
176     This is only valid for mirror devices, of course. It's the callers
177     duty to send a correct disk, otherwise we raise an error.
178
179     """
180     bdev_s, ndev_s = params
181     bdev = objects.Disk.FromDict(bdev_s)
182     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
183     if bdev is None or ndevs.count(None) > 0:
184       raise ValueError("can't unserialize data!")
185     return backend.BlockdevRemovechildren(bdev, ndevs)
186
187   @staticmethod
188   def perspective_blockdev_getmirrorstatus(params):
189     """Return the mirror status for a list of disks.
190
191     """
192     disks = [objects.Disk.FromDict(dsk_s)
193             for dsk_s in params]
194     return backend.BlockdevGetmirrorstatus(disks)
195
196   @staticmethod
197   def perspective_blockdev_find(params):
198     """Expose the FindBlockDevice functionality for a disk.
199
200     This will try to find but not activate a disk.
201
202     """
203     disk = objects.Disk.FromDict(params[0])
204     return backend.BlockdevFind(disk)
205
206   @staticmethod
207   def perspective_blockdev_snapshot(params):
208     """Create a snapshot device.
209
210     Note that this is only valid for LVM disks, if we get passed
211     something else we raise an exception. The snapshot device can be
212     remove by calling the generic block device remove call.
213
214     """
215     cfbd = objects.Disk.FromDict(params[0])
216     return backend.BlockdevSnapshot(cfbd)
217
218   @staticmethod
219   def perspective_blockdev_grow(params):
220     """Grow a stack of devices.
221
222     """
223     cfbd = objects.Disk.FromDict(params[0])
224     amount = params[1]
225     return backend.BlockdevGrow(cfbd, amount)
226
227   @staticmethod
228   def perspective_blockdev_close(params):
229     """Closes the given block devices.
230
231     """
232     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
233     return backend.BlockdevClose(params[0], disks)
234
235   @staticmethod
236   def perspective_blockdev_getsize(params):
237     """Compute the sizes of the given block devices.
238
239     """
240     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
241     return backend.BlockdevGetsize(disks)
242
243   # blockdev/drbd specific methods ----------
244
245   @staticmethod
246   def perspective_drbd_disconnect_net(params):
247     """Disconnects the network connection of drbd disks.
248
249     Note that this is only valid for drbd disks, so the members of the
250     disk list must all be drbd devices.
251
252     """
253     nodes_ip, disks = params
254     disks = [objects.Disk.FromDict(cf) for cf in disks]
255     return backend.DrbdDisconnectNet(nodes_ip, disks)
256
257   @staticmethod
258   def perspective_drbd_attach_net(params):
259     """Attaches the network connection of drbd disks.
260
261     Note that this is only valid for drbd disks, so the members of the
262     disk list must all be drbd devices.
263
264     """
265     nodes_ip, disks, instance_name, multimaster = params
266     disks = [objects.Disk.FromDict(cf) for cf in disks]
267     return backend.DrbdAttachNet(nodes_ip, disks,
268                                      instance_name, multimaster)
269
270   @staticmethod
271   def perspective_drbd_wait_sync(params):
272     """Wait until DRBD disks are synched.
273
274     Note that this is only valid for drbd disks, so the members of the
275     disk list must all be drbd devices.
276
277     """
278     nodes_ip, disks = params
279     disks = [objects.Disk.FromDict(cf) for cf in disks]
280     return backend.DrbdWaitSync(nodes_ip, disks)
281
282   # export/import  --------------------------
283
284   @staticmethod
285   def perspective_snapshot_export(params):
286     """Export a given snapshot.
287
288     """
289     disk = objects.Disk.FromDict(params[0])
290     dest_node = params[1]
291     instance = objects.Instance.FromDict(params[2])
292     cluster_name = params[3]
293     dev_idx = params[4]
294     return backend.ExportSnapshot(disk, dest_node, instance,
295                                   cluster_name, dev_idx)
296
297   @staticmethod
298   def perspective_finalize_export(params):
299     """Expose the finalize export functionality.
300
301     """
302     instance = objects.Instance.FromDict(params[0])
303     snap_disks = [objects.Disk.FromDict(str_data)
304                   for str_data in params[1]]
305     return backend.FinalizeExport(instance, snap_disks)
306
307   @staticmethod
308   def perspective_export_info(params):
309     """Query information about an existing export on this node.
310
311     The given path may not contain an export, in which case we return
312     None.
313
314     """
315     path = params[0]
316     einfo = backend.ExportInfo(path)
317     if einfo is None:
318       return einfo
319     return einfo.Dumps()
320
321   @staticmethod
322   def perspective_export_list(params):
323     """List the available exports on this node.
324
325     Note that as opposed to export_info, which may query data about an
326     export in any path, this only queries the standard Ganeti path
327     (constants.EXPORT_DIR).
328
329     """
330     return backend.ListExports()
331
332   @staticmethod
333   def perspective_export_remove(params):
334     """Remove an export.
335
336     """
337     export = params[0]
338     return backend.RemoveExport(export)
339
340   # volume  --------------------------
341
342   @staticmethod
343   def perspective_volume_list(params):
344     """Query the list of logical volumes in a given volume group.
345
346     """
347     vgname = params[0]
348     return backend.GetVolumeList(vgname)
349
350   @staticmethod
351   def perspective_vg_list(params):
352     """Query the list of volume groups.
353
354     """
355     return backend.ListVolumeGroups()
356
357   # bridge  --------------------------
358
359   @staticmethod
360   def perspective_bridges_exist(params):
361     """Check if all bridges given exist on this node.
362
363     """
364     bridges_list = params[0]
365     return backend.BridgesExist(bridges_list)
366
367   # instance  --------------------------
368
369   @staticmethod
370   def perspective_instance_os_add(params):
371     """Install an OS on a given instance.
372
373     """
374     inst_s = params[0]
375     inst = objects.Instance.FromDict(inst_s)
376     return backend.InstanceOsAdd(inst)
377
378   @staticmethod
379   def perspective_instance_run_rename(params):
380     """Runs the OS rename script for an instance.
381
382     """
383     inst_s, old_name = params
384     inst = objects.Instance.FromDict(inst_s)
385     return backend.RunRenameInstance(inst, old_name)
386
387   @staticmethod
388   def perspective_instance_os_import(params):
389     """Run the import function of an OS onto a given instance.
390
391     """
392     inst_s, src_node, src_images, cluster_name = params
393     inst = objects.Instance.FromDict(inst_s)
394     return backend.ImportOSIntoInstance(inst, src_node, src_images,
395                                         cluster_name)
396
397   @staticmethod
398   def perspective_instance_shutdown(params):
399     """Shutdown an instance.
400
401     """
402     instance = objects.Instance.FromDict(params[0])
403     return backend.InstanceShutdown(instance)
404
405   @staticmethod
406   def perspective_instance_start(params):
407     """Start an instance.
408
409     """
410     instance = objects.Instance.FromDict(params[0])
411     return backend.StartInstance(instance)
412
413   @staticmethod
414   def perspective_migration_info(params):
415     """Gather information about an instance to be migrated.
416
417     """
418     instance = objects.Instance.FromDict(params[0])
419     return backend.MigrationInfo(instance)
420
421   @staticmethod
422   def perspective_accept_instance(params):
423     """Prepare the node to accept an instance.
424
425     """
426     instance, info, target = params
427     instance = objects.Instance.FromDict(instance)
428     return backend.AcceptInstance(instance, info, target)
429
430   @staticmethod
431   def perspective_finalize_migration(params):
432     """Finalize the instance migration.
433
434     """
435     instance, info, success = params
436     instance = objects.Instance.FromDict(instance)
437     return backend.FinalizeMigration(instance, info, success)
438
439   @staticmethod
440   def perspective_instance_migrate(params):
441     """Migrates an instance.
442
443     """
444     instance, target, live = params
445     instance = objects.Instance.FromDict(instance)
446     return backend.MigrateInstance(instance, target, live)
447
448   @staticmethod
449   def perspective_instance_reboot(params):
450     """Reboot an instance.
451
452     """
453     instance = objects.Instance.FromDict(params[0])
454     reboot_type = params[1]
455     return backend.InstanceReboot(instance, reboot_type)
456
457   @staticmethod
458   def perspective_instance_info(params):
459     """Query instance information.
460
461     """
462     return backend.GetInstanceInfo(params[0], params[1])
463
464   @staticmethod
465   def perspective_instance_migratable(params):
466     """Query whether the specified instance can be migrated.
467
468     """
469     instance = objects.Instance.FromDict(params[0])
470     return backend.GetInstanceMigratable(instance)
471
472   @staticmethod
473   def perspective_all_instances_info(params):
474     """Query information about all instances.
475
476     """
477     return backend.GetAllInstancesInfo(params[0])
478
479   @staticmethod
480   def perspective_instance_list(params):
481     """Query the list of running instances.
482
483     """
484     return backend.GetInstanceList(params[0])
485
486   # node --------------------------
487
488   @staticmethod
489   def perspective_node_tcp_ping(params):
490     """Do a TcpPing on the remote node.
491
492     """
493     return utils.TcpPing(params[1], params[2], timeout=params[3],
494                          live_port_needed=params[4], source=params[0])
495
496   @staticmethod
497   def perspective_node_has_ip_address(params):
498     """Checks if a node has the given ip address.
499
500     """
501     return utils.OwnIpAddress(params[0])
502
503   @staticmethod
504   def perspective_node_info(params):
505     """Query node information.
506
507     """
508     vgname, hypervisor_type = params
509     return backend.GetNodeInfo(vgname, hypervisor_type)
510
511   @staticmethod
512   def perspective_node_add(params):
513     """Complete the registration of this node in the cluster.
514
515     """
516     return backend.AddNode(params[0], params[1], params[2],
517                            params[3], params[4], params[5])
518
519   @staticmethod
520   def perspective_node_verify(params):
521     """Run a verify sequence on this node.
522
523     """
524     return backend.VerifyNode(params[0], params[1])
525
526   @staticmethod
527   def perspective_node_start_master(params):
528     """Promote this node to master status.
529
530     """
531     return backend.StartMaster(params[0], params[1])
532
533   @staticmethod
534   def perspective_node_stop_master(params):
535     """Demote this node from master status.
536
537     """
538     return backend.StopMaster(params[0])
539
540   @staticmethod
541   def perspective_node_leave_cluster(params):
542     """Cleanup after leaving a cluster.
543
544     """
545     return backend.LeaveCluster()
546
547   @staticmethod
548   def perspective_node_volumes(params):
549     """Query the list of all logical volume groups.
550
551     """
552     return backend.NodeVolumes()
553
554   @staticmethod
555   def perspective_node_demote_from_mc(params):
556     """Demote a node from the master candidate role.
557
558     """
559     return backend.DemoteFromMC()
560
561
562   # cluster --------------------------
563
564   @staticmethod
565   def perspective_version(params):
566     """Query version information.
567
568     """
569     return constants.PROTOCOL_VERSION
570
571   @staticmethod
572   def perspective_upload_file(params):
573     """Upload a file.
574
575     Note that the backend implementation imposes strict rules on which
576     files are accepted.
577
578     """
579     return backend.UploadFile(*params)
580
581   @staticmethod
582   def perspective_master_info(params):
583     """Query master information.
584
585     """
586     return backend.GetMasterInfo()
587
588   @staticmethod
589   def perspective_write_ssconf_files(params):
590     """Write ssconf files.
591
592     """
593     (values,) = params
594     return backend.WriteSsconfFiles(values)
595
596   # os -----------------------
597
598   @staticmethod
599   def perspective_os_diagnose(params):
600     """Query detailed information about existing OSes.
601
602     """
603     return [os_obj.ToDict() for os_obj in backend.DiagnoseOS()]
604
605   @staticmethod
606   def perspective_os_get(params):
607     """Query information about a given OS.
608
609     """
610     name = params[0]
611     try:
612       os_obj = backend.OSFromDisk(name)
613     except errors.InvalidOS, err:
614       os_obj = objects.OS.FromInvalidOS(err)
615     return os_obj.ToDict()
616
617   # hooks -----------------------
618
619   @staticmethod
620   def perspective_hooks_runner(params):
621     """Run hook scripts.
622
623     """
624     hpath, phase, env = params
625     hr = backend.HooksRunner()
626     return hr.RunHooks(hpath, phase, env)
627
628   # iallocator -----------------
629
630   @staticmethod
631   def perspective_iallocator_runner(params):
632     """Run an iallocator script.
633
634     """
635     name, idata = params
636     iar = backend.IAllocatorRunner()
637     return iar.Run(name, idata)
638
639   # test -----------------------
640
641   @staticmethod
642   def perspective_test_delay(params):
643     """Run test delay.
644
645     """
646     duration = params[0]
647     return utils.TestDelay(duration)
648
649   # file storage ---------------
650
651   @staticmethod
652   def perspective_file_storage_dir_create(params):
653     """Create the file storage directory.
654
655     """
656     file_storage_dir = params[0]
657     return backend.CreateFileStorageDir(file_storage_dir)
658
659   @staticmethod
660   def perspective_file_storage_dir_remove(params):
661     """Remove the file storage directory.
662
663     """
664     file_storage_dir = params[0]
665     return backend.RemoveFileStorageDir(file_storage_dir)
666
667   @staticmethod
668   def perspective_file_storage_dir_rename(params):
669     """Rename the file storage directory.
670
671     """
672     old_file_storage_dir = params[0]
673     new_file_storage_dir = params[1]
674     return backend.RenameFileStorageDir(old_file_storage_dir,
675                                         new_file_storage_dir)
676
677   # jobs ------------------------
678
679   @staticmethod
680   @_RequireJobQueueLock
681   def perspective_jobqueue_update(params):
682     """Update job queue.
683
684     """
685     (file_name, content) = params
686     return backend.JobQueueUpdate(file_name, content)
687
688   @staticmethod
689   @_RequireJobQueueLock
690   def perspective_jobqueue_purge(params):
691     """Purge job queue.
692
693     """
694     return backend.JobQueuePurge()
695
696   @staticmethod
697   @_RequireJobQueueLock
698   def perspective_jobqueue_rename(params):
699     """Rename a job queue file.
700
701     """
702     # TODO: What if a file fails to rename?
703     return [backend.JobQueueRename(old, new) for old, new in params]
704
705   @staticmethod
706   def perspective_jobqueue_set_drain(params):
707     """Set/unset the queue drain flag.
708
709     """
710     drain_flag = params[0]
711     return backend.JobQueueSetDrainFlag(drain_flag)
712
713
714   # hypervisor ---------------
715
716   @staticmethod
717   def perspective_hypervisor_validate_params(params):
718     """Validate the hypervisor parameters.
719
720     """
721     (hvname, hvparams) = params
722     return backend.ValidateHVParams(hvname, hvparams)
723
724
725 def ParseOptions():
726   """Parse the command line options.
727
728   @return: (options, args) as from OptionParser.parse_args()
729
730   """
731   parser = OptionParser(description="Ganeti node daemon",
732                         usage="%prog [-f] [-d] [-b ADDRESS]",
733                         version="%%prog (ganeti) %s" %
734                         constants.RELEASE_VERSION)
735
736   parser.add_option("-f", "--foreground", dest="fork",
737                     help="Don't detach from the current terminal",
738                     default=True, action="store_false")
739   parser.add_option("-d", "--debug", dest="debug",
740                     help="Enable some debug messages",
741                     default=False, action="store_true")
742   parser.add_option("-b", "--bind", dest="bind_address",
743                     help="Bind address",
744                     default="", metavar="ADDRESS")
745
746   options, args = parser.parse_args()
747   return options, args
748
749
750 def main():
751   """Main function for the node daemon.
752
753   """
754   global queue_lock
755
756   options, args = ParseOptions()
757
758   if options.fork:
759     utils.CloseFDs()
760
761   for fname in (constants.SSL_CERT_FILE,):
762     if not os.path.isfile(fname):
763       print "config %s not there, will not run." % fname
764       sys.exit(constants.EXIT_NOTCLUSTER)
765
766   port = utils.GetNodeDaemonPort()
767
768   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
769   dirs.append((constants.LOG_OS_DIR, 0750))
770   dirs.append((constants.LOCK_DIR, 1777))
771   utils.EnsureDirs(dirs)
772
773   # become a daemon
774   if options.fork:
775     utils.Daemonize(logfile=constants.LOG_NODESERVER)
776
777   utils.WritePidFile(constants.NODED_PID)
778   try:
779     utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
780                        stderr_logging=not options.fork)
781     logging.info("ganeti node daemon startup")
782
783     # Read SSL certificate
784     ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
785                                     ssl_cert_path=constants.SSL_CERT_FILE)
786
787     # Prepare job queue
788     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
789
790     mainloop = daemon.Mainloop()
791     server = NodeHttpServer(mainloop, options.bind_address, port,
792                             ssl_params=ssl_params, ssl_verify_peer=True)
793     server.Start()
794     try:
795       mainloop.Run()
796     finally:
797       server.Stop()
798   finally:
799     utils.RemovePidFile(constants.NODED_PID)
800
801
802 if __name__ == '__main__':
803   main()