Convert node_has_ip_address rpc to new style
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import SocketServer
31 import errno
32 import logging
33 import signal
34
35 from optparse import OptionParser
36
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
45
46 import ganeti.http.server
47
48
49 queue_lock = None
50
51
52 def _RequireJobQueueLock(fn):
53   """Decorator for job queue manipulating functions.
54
55   """
56   QUEUE_LOCK_TIMEOUT = 10
57
58   def wrapper(*args, **kwargs):
59     # Locking in exclusive, blocking mode because there could be several
60     # children running at the same time. Waiting up to 10 seconds.
61     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
62     try:
63       return fn(*args, **kwargs)
64     finally:
65       queue_lock.Unlock()
66
67   return wrapper
68
69
70 class NodeHttpServer(http.server.HttpServer):
71   """The server implementation.
72
73   This class holds all methods exposed over the RPC interface.
74
75   """
76   def __init__(self, *args, **kwargs):
77     http.server.HttpServer.__init__(self, *args, **kwargs)
78     self.noded_pid = os.getpid()
79
80   def HandleRequest(self, req):
81     """Handle a request.
82
83     """
84     if req.request_method.upper() != http.HTTP_PUT:
85       raise http.HttpBadRequest()
86
87     path = req.request_path
88     if path.startswith("/"):
89       path = path[1:]
90
91     method = getattr(self, "perspective_%s" % path, None)
92     if method is None:
93       raise http.HttpNotFound()
94
95     try:
96       try:
97         return method(req.request_body)
98       except backend.RPCFail, err:
99         # our custom failure exception; str(err) works fine if the
100         # exception was constructed with a single argument, and in
101         # this case, err.message == err.args[0] == str(err)
102         return (False, str(err))
103       except:
104         logging.exception("Error in RPC call")
105         raise
106     except errors.QuitGanetiException, err:
107       # Tell parent to quit
108       os.kill(self.noded_pid, signal.SIGTERM)
109
110   # the new block devices  --------------------------
111
112   @staticmethod
113   def perspective_blockdev_create(params):
114     """Create a block device.
115
116     """
117     bdev_s, size, owner, on_primary, info = params
118     bdev = objects.Disk.FromDict(bdev_s)
119     if bdev is None:
120       raise ValueError("can't unserialize data!")
121     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
122
123   @staticmethod
124   def perspective_blockdev_remove(params):
125     """Remove a block device.
126
127     """
128     bdev_s = params[0]
129     bdev = objects.Disk.FromDict(bdev_s)
130     return backend.BlockdevRemove(bdev)
131
132   @staticmethod
133   def perspective_blockdev_rename(params):
134     """Remove a block device.
135
136     """
137     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
138     return backend.BlockdevRename(devlist)
139
140   @staticmethod
141   def perspective_blockdev_assemble(params):
142     """Assemble a block device.
143
144     """
145     bdev_s, owner, on_primary = params
146     bdev = objects.Disk.FromDict(bdev_s)
147     if bdev is None:
148       raise ValueError("can't unserialize data!")
149     return backend.BlockdevAssemble(bdev, owner, on_primary)
150
151   @staticmethod
152   def perspective_blockdev_shutdown(params):
153     """Shutdown a block device.
154
155     """
156     bdev_s = params[0]
157     bdev = objects.Disk.FromDict(bdev_s)
158     if bdev is None:
159       raise ValueError("can't unserialize data!")
160     return backend.BlockdevShutdown(bdev)
161
162   @staticmethod
163   def perspective_blockdev_addchildren(params):
164     """Add a child to a mirror device.
165
166     Note: this is only valid for mirror devices. It's the caller's duty
167     to send a correct disk, otherwise we raise an error.
168
169     """
170     bdev_s, ndev_s = params
171     bdev = objects.Disk.FromDict(bdev_s)
172     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
173     if bdev is None or ndevs.count(None) > 0:
174       raise ValueError("can't unserialize data!")
175     return backend.BlockdevAddchildren(bdev, ndevs)
176
177   @staticmethod
178   def perspective_blockdev_removechildren(params):
179     """Remove a child from a mirror device.
180
181     This is only valid for mirror devices, of course. It's the callers
182     duty to send a correct disk, otherwise we raise an error.
183
184     """
185     bdev_s, ndev_s = params
186     bdev = objects.Disk.FromDict(bdev_s)
187     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
188     if bdev is None or ndevs.count(None) > 0:
189       raise ValueError("can't unserialize data!")
190     return backend.BlockdevRemovechildren(bdev, ndevs)
191
192   @staticmethod
193   def perspective_blockdev_getmirrorstatus(params):
194     """Return the mirror status for a list of disks.
195
196     """
197     disks = [objects.Disk.FromDict(dsk_s)
198             for dsk_s in params]
199     return backend.BlockdevGetmirrorstatus(disks)
200
201   @staticmethod
202   def perspective_blockdev_find(params):
203     """Expose the FindBlockDevice functionality for a disk.
204
205     This will try to find but not activate a disk.
206
207     """
208     disk = objects.Disk.FromDict(params[0])
209     return backend.BlockdevFind(disk)
210
211   @staticmethod
212   def perspective_blockdev_snapshot(params):
213     """Create a snapshot device.
214
215     Note that this is only valid for LVM disks, if we get passed
216     something else we raise an exception. The snapshot device can be
217     remove by calling the generic block device remove call.
218
219     """
220     cfbd = objects.Disk.FromDict(params[0])
221     return backend.BlockdevSnapshot(cfbd)
222
223   @staticmethod
224   def perspective_blockdev_grow(params):
225     """Grow a stack of devices.
226
227     """
228     cfbd = objects.Disk.FromDict(params[0])
229     amount = params[1]
230     return backend.BlockdevGrow(cfbd, amount)
231
232   @staticmethod
233   def perspective_blockdev_close(params):
234     """Closes the given block devices.
235
236     """
237     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
238     return backend.BlockdevClose(params[0], disks)
239
240   # blockdev/drbd specific methods ----------
241
242   @staticmethod
243   def perspective_drbd_disconnect_net(params):
244     """Disconnects the network connection of drbd disks.
245
246     Note that this is only valid for drbd disks, so the members of the
247     disk list must all be drbd devices.
248
249     """
250     nodes_ip, disks = params
251     disks = [objects.Disk.FromDict(cf) for cf in disks]
252     return backend.DrbdDisconnectNet(nodes_ip, disks)
253
254   @staticmethod
255   def perspective_drbd_attach_net(params):
256     """Attaches the network connection of drbd disks.
257
258     Note that this is only valid for drbd disks, so the members of the
259     disk list must all be drbd devices.
260
261     """
262     nodes_ip, disks, instance_name, multimaster = params
263     disks = [objects.Disk.FromDict(cf) for cf in disks]
264     return backend.DrbdAttachNet(nodes_ip, disks,
265                                      instance_name, multimaster)
266
267   @staticmethod
268   def perspective_drbd_wait_sync(params):
269     """Wait until DRBD disks are synched.
270
271     Note that this is only valid for drbd disks, so the members of the
272     disk list must all be drbd devices.
273
274     """
275     nodes_ip, disks = params
276     disks = [objects.Disk.FromDict(cf) for cf in disks]
277     return backend.DrbdWaitSync(nodes_ip, disks)
278
279   # export/import  --------------------------
280
281   @staticmethod
282   def perspective_snapshot_export(params):
283     """Export a given snapshot.
284
285     """
286     disk = objects.Disk.FromDict(params[0])
287     dest_node = params[1]
288     instance = objects.Instance.FromDict(params[2])
289     cluster_name = params[3]
290     dev_idx = params[4]
291     return backend.ExportSnapshot(disk, dest_node, instance,
292                                   cluster_name, dev_idx)
293
294   @staticmethod
295   def perspective_finalize_export(params):
296     """Expose the finalize export functionality.
297
298     """
299     instance = objects.Instance.FromDict(params[0])
300     snap_disks = [objects.Disk.FromDict(str_data)
301                   for str_data in params[1]]
302     return backend.FinalizeExport(instance, snap_disks)
303
304   @staticmethod
305   def perspective_export_info(params):
306     """Query information about an existing export on this node.
307
308     The given path may not contain an export, in which case we return
309     None.
310
311     """
312     path = params[0]
313     return backend.ExportInfo(path)
314
315   @staticmethod
316   def perspective_export_list(params):
317     """List the available exports on this node.
318
319     Note that as opposed to export_info, which may query data about an
320     export in any path, this only queries the standard Ganeti path
321     (constants.EXPORT_DIR).
322
323     """
324     return backend.ListExports()
325
326   @staticmethod
327   def perspective_export_remove(params):
328     """Remove an export.
329
330     """
331     export = params[0]
332     return backend.RemoveExport(export)
333
334   # volume  --------------------------
335
336   @staticmethod
337   def perspective_volume_list(params):
338     """Query the list of logical volumes in a given volume group.
339
340     """
341     vgname = params[0]
342     return True, backend.GetVolumeList(vgname)
343
344   @staticmethod
345   def perspective_vg_list(params):
346     """Query the list of volume groups.
347
348     """
349     return backend.ListVolumeGroups()
350
351   # bridge  --------------------------
352
353   @staticmethod
354   def perspective_bridges_exist(params):
355     """Check if all bridges given exist on this node.
356
357     """
358     bridges_list = params[0]
359     return backend.BridgesExist(bridges_list)
360
361   # instance  --------------------------
362
363   @staticmethod
364   def perspective_instance_os_add(params):
365     """Install an OS on a given instance.
366
367     """
368     inst_s = params[0]
369     inst = objects.Instance.FromDict(inst_s)
370     reinstall = params[1]
371     return backend.InstanceOsAdd(inst, reinstall)
372
373   @staticmethod
374   def perspective_instance_run_rename(params):
375     """Runs the OS rename script for an instance.
376
377     """
378     inst_s, old_name = params
379     inst = objects.Instance.FromDict(inst_s)
380     return backend.RunRenameInstance(inst, old_name)
381
382   @staticmethod
383   def perspective_instance_os_import(params):
384     """Run the import function of an OS onto a given instance.
385
386     """
387     inst_s, src_node, src_images, cluster_name = params
388     inst = objects.Instance.FromDict(inst_s)
389     return backend.ImportOSIntoInstance(inst, src_node, src_images,
390                                         cluster_name)
391
392   @staticmethod
393   def perspective_instance_shutdown(params):
394     """Shutdown an instance.
395
396     """
397     instance = objects.Instance.FromDict(params[0])
398     return backend.InstanceShutdown(instance)
399
400   @staticmethod
401   def perspective_instance_start(params):
402     """Start an instance.
403
404     """
405     instance = objects.Instance.FromDict(params[0])
406     return backend.StartInstance(instance)
407
408   @staticmethod
409   def perspective_migration_info(params):
410     """Gather information about an instance to be migrated.
411
412     """
413     instance = objects.Instance.FromDict(params[0])
414     return backend.MigrationInfo(instance)
415
416   @staticmethod
417   def perspective_accept_instance(params):
418     """Prepare the node to accept an instance.
419
420     """
421     instance, info, target = params
422     instance = objects.Instance.FromDict(instance)
423     return backend.AcceptInstance(instance, info, target)
424
425   @staticmethod
426   def perspective_finalize_migration(params):
427     """Finalize the instance migration.
428
429     """
430     instance, info, success = params
431     instance = objects.Instance.FromDict(instance)
432     return backend.FinalizeMigration(instance, info, success)
433
434   @staticmethod
435   def perspective_instance_migrate(params):
436     """Migrates an instance.
437
438     """
439     instance, target, live = params
440     instance = objects.Instance.FromDict(instance)
441     return backend.MigrateInstance(instance, target, live)
442
443   @staticmethod
444   def perspective_instance_reboot(params):
445     """Reboot an instance.
446
447     """
448     instance = objects.Instance.FromDict(params[0])
449     reboot_type = params[1]
450     return backend.InstanceReboot(instance, reboot_type)
451
452   @staticmethod
453   def perspective_instance_info(params):
454     """Query instance information.
455
456     """
457     return backend.GetInstanceInfo(params[0], params[1])
458
459   @staticmethod
460   def perspective_instance_migratable(params):
461     """Query whether the specified instance can be migrated.
462
463     """
464     instance = objects.Instance.FromDict(params[0])
465     return backend.GetInstanceMigratable(instance)
466
467   @staticmethod
468   def perspective_all_instances_info(params):
469     """Query information about all instances.
470
471     """
472     return backend.GetAllInstancesInfo(params[0])
473
474   @staticmethod
475   def perspective_instance_list(params):
476     """Query the list of running instances.
477
478     """
479     return True, backend.GetInstanceList(params[0])
480
481   # node --------------------------
482
483   @staticmethod
484   def perspective_node_tcp_ping(params):
485     """Do a TcpPing on the remote node.
486
487     """
488     return utils.TcpPing(params[1], params[2], timeout=params[3],
489                          live_port_needed=params[4], source=params[0])
490
491   @staticmethod
492   def perspective_node_has_ip_address(params):
493     """Checks if a node has the given ip address.
494
495     """
496     return True, utils.OwnIpAddress(params[0])
497
498   @staticmethod
499   def perspective_node_info(params):
500     """Query node information.
501
502     """
503     vgname, hypervisor_type = params
504     return backend.GetNodeInfo(vgname, hypervisor_type)
505
506   @staticmethod
507   def perspective_node_add(params):
508     """Complete the registration of this node in the cluster.
509
510     """
511     return backend.AddNode(params[0], params[1], params[2],
512                            params[3], params[4], params[5])
513
514   @staticmethod
515   def perspective_node_verify(params):
516     """Run a verify sequence on this node.
517
518     """
519     return backend.VerifyNode(params[0], params[1])
520
521   @staticmethod
522   def perspective_node_start_master(params):
523     """Promote this node to master status.
524
525     """
526     return backend.StartMaster(params[0])
527
528   @staticmethod
529   def perspective_node_stop_master(params):
530     """Demote this node from master status.
531
532     """
533     return backend.StopMaster(params[0])
534
535   @staticmethod
536   def perspective_node_leave_cluster(params):
537     """Cleanup after leaving a cluster.
538
539     """
540     return backend.LeaveCluster()
541
542   @staticmethod
543   def perspective_node_volumes(params):
544     """Query the list of all logical volume groups.
545
546     """
547     return backend.NodeVolumes()
548
549   @staticmethod
550   def perspective_node_demote_from_mc(params):
551     """Demote a node from the master candidate role.
552
553     """
554     return backend.DemoteFromMC()
555
556
557   @staticmethod
558   def perspective_node_powercycle(params):
559     """Tries to powercycle the nod.
560
561     """
562     hypervisor_type = params[0]
563     return backend.PowercycleNode(hypervisor_type)
564
565
566   # cluster --------------------------
567
568   @staticmethod
569   def perspective_version(params):
570     """Query version information.
571
572     """
573     return constants.PROTOCOL_VERSION
574
575   @staticmethod
576   def perspective_upload_file(params):
577     """Upload a file.
578
579     Note that the backend implementation imposes strict rules on which
580     files are accepted.
581
582     """
583     return backend.UploadFile(*params)
584
585   @staticmethod
586   def perspective_master_info(params):
587     """Query master information.
588
589     """
590     return backend.GetMasterInfo()
591
592   @staticmethod
593   def perspective_write_ssconf_files(params):
594     """Write ssconf files.
595
596     """
597     (values,) = params
598     return backend.WriteSsconfFiles(values)
599
600   # os -----------------------
601
602   @staticmethod
603   def perspective_os_diagnose(params):
604     """Query detailed information about existing OSes.
605
606     """
607     return [os_obj.ToDict() for os_obj in backend.DiagnoseOS()]
608
609   @staticmethod
610   def perspective_os_get(params):
611     """Query information about a given OS.
612
613     """
614     name = params[0]
615     try:
616       os_obj = backend.OSFromDisk(name)
617     except errors.InvalidOS, err:
618       os_obj = objects.OS.FromInvalidOS(err)
619     return os_obj.ToDict()
620
621   # hooks -----------------------
622
623   @staticmethod
624   def perspective_hooks_runner(params):
625     """Run hook scripts.
626
627     """
628     hpath, phase, env = params
629     hr = backend.HooksRunner()
630     return hr.RunHooks(hpath, phase, env)
631
632   # iallocator -----------------
633
634   @staticmethod
635   def perspective_iallocator_runner(params):
636     """Run an iallocator script.
637
638     """
639     name, idata = params
640     iar = backend.IAllocatorRunner()
641     return iar.Run(name, idata)
642
643   # test -----------------------
644
645   @staticmethod
646   def perspective_test_delay(params):
647     """Run test delay.
648
649     """
650     duration = params[0]
651     return utils.TestDelay(duration)
652
653   # file storage ---------------
654
655   @staticmethod
656   def perspective_file_storage_dir_create(params):
657     """Create the file storage directory.
658
659     """
660     file_storage_dir = params[0]
661     return backend.CreateFileStorageDir(file_storage_dir)
662
663   @staticmethod
664   def perspective_file_storage_dir_remove(params):
665     """Remove the file storage directory.
666
667     """
668     file_storage_dir = params[0]
669     return backend.RemoveFileStorageDir(file_storage_dir)
670
671   @staticmethod
672   def perspective_file_storage_dir_rename(params):
673     """Rename the file storage directory.
674
675     """
676     old_file_storage_dir = params[0]
677     new_file_storage_dir = params[1]
678     return backend.RenameFileStorageDir(old_file_storage_dir,
679                                         new_file_storage_dir)
680
681   # jobs ------------------------
682
683   @staticmethod
684   @_RequireJobQueueLock
685   def perspective_jobqueue_update(params):
686     """Update job queue.
687
688     """
689     (file_name, content) = params
690     return backend.JobQueueUpdate(file_name, content)
691
692   @staticmethod
693   @_RequireJobQueueLock
694   def perspective_jobqueue_purge(params):
695     """Purge job queue.
696
697     """
698     return backend.JobQueuePurge()
699
700   @staticmethod
701   @_RequireJobQueueLock
702   def perspective_jobqueue_rename(params):
703     """Rename a job queue file.
704
705     """
706     # TODO: What if a file fails to rename?
707     return [backend.JobQueueRename(old, new) for old, new in params]
708
709   @staticmethod
710   def perspective_jobqueue_set_drain(params):
711     """Set/unset the queue drain flag.
712
713     """
714     drain_flag = params[0]
715     return backend.JobQueueSetDrainFlag(drain_flag)
716
717
718   # hypervisor ---------------
719
720   @staticmethod
721   def perspective_hypervisor_validate_params(params):
722     """Validate the hypervisor parameters.
723
724     """
725     (hvname, hvparams) = params
726     return backend.ValidateHVParams(hvname, hvparams)
727
728
729 def ParseOptions():
730   """Parse the command line options.
731
732   @return: (options, args) as from OptionParser.parse_args()
733
734   """
735   parser = OptionParser(description="Ganeti node daemon",
736                         usage="%prog [-f] [-d] [-b ADDRESS]",
737                         version="%%prog (ganeti) %s" %
738                         constants.RELEASE_VERSION)
739
740   parser.add_option("-f", "--foreground", dest="fork",
741                     help="Don't detach from the current terminal",
742                     default=True, action="store_false")
743   parser.add_option("-d", "--debug", dest="debug",
744                     help="Enable some debug messages",
745                     default=False, action="store_true")
746   parser.add_option("-b", "--bind", dest="bind_address",
747                     help="Bind address",
748                     default="", metavar="ADDRESS")
749
750   options, args = parser.parse_args()
751   return options, args
752
753
754 def main():
755   """Main function for the node daemon.
756
757   """
758   global queue_lock
759
760   options, args = ParseOptions()
761   utils.debug = options.debug
762
763   if options.fork:
764     utils.CloseFDs()
765
766   for fname in (constants.SSL_CERT_FILE,):
767     if not os.path.isfile(fname):
768       print "config %s not there, will not run." % fname
769       sys.exit(5)
770
771   try:
772     port = utils.GetNodeDaemonPort()
773   except errors.ConfigurationError, err:
774     print "Cluster configuration incomplete: '%s'" % str(err)
775     sys.exit(5)
776
777   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
778   dirs.append((constants.LOG_OS_DIR, 0750))
779   dirs.append((constants.LOCK_DIR, 1777))
780   utils.EnsureDirs(dirs)
781
782   # become a daemon
783   if options.fork:
784     utils.Daemonize(logfile=constants.LOG_NODESERVER)
785
786   utils.WritePidFile(constants.NODED_PID)
787   try:
788     utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
789                        stderr_logging=not options.fork)
790     logging.info("ganeti node daemon startup")
791
792     # Read SSL certificate
793     ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
794                                     ssl_cert_path=constants.SSL_CERT_FILE)
795
796     # Prepare job queue
797     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
798
799     mainloop = daemon.Mainloop()
800     server = NodeHttpServer(mainloop, options.bind_address, port,
801                             ssl_params=ssl_params, ssl_verify_peer=True)
802     server.Start()
803     try:
804       mainloop.Run()
805     finally:
806       server.Stop()
807   finally:
808     utils.RemovePidFile(constants.NODED_PID)
809
810
811 if __name__ == '__main__':
812   main()