Merge commit 'origin/next' into branch-2.1
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import SocketServer
30 import logging
31 import signal
32
33 from optparse import OptionParser
34
35 from ganeti import backend
36 from ganeti import constants
37 from ganeti import objects
38 from ganeti import errors
39 from ganeti import jstore
40 from ganeti import daemon
41 from ganeti import http
42 from ganeti import utils
43 from ganeti import storage
44
45 import ganeti.http.server
46
47
48 queue_lock = None
49
50
51 def _RequireJobQueueLock(fn):
52   """Decorator for job queue manipulating functions.
53
54   """
55   QUEUE_LOCK_TIMEOUT = 10
56
57   def wrapper(*args, **kwargs):
58     # Locking in exclusive, blocking mode because there could be several
59     # children running at the same time. Waiting up to 10 seconds.
60     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
61     try:
62       return fn(*args, **kwargs)
63     finally:
64       queue_lock.Unlock()
65
66   return wrapper
67
68
69 class NodeHttpServer(http.server.HttpServer):
70   """The server implementation.
71
72   This class holds all methods exposed over the RPC interface.
73
74   """
75   def __init__(self, *args, **kwargs):
76     http.server.HttpServer.__init__(self, *args, **kwargs)
77     self.noded_pid = os.getpid()
78
79   def HandleRequest(self, req):
80     """Handle a request.
81
82     """
83     if req.request_method.upper() != http.HTTP_PUT:
84       raise http.HttpBadRequest()
85
86     path = req.request_path
87     if path.startswith("/"):
88       path = path[1:]
89
90     method = getattr(self, "perspective_%s" % path, None)
91     if method is None:
92       raise http.HttpNotFound()
93
94     try:
95       rvalue = method(req.request_body)
96       return True, rvalue
97
98     except backend.RPCFail, err:
99       # our custom failure exception; str(err) works fine if the
100       # exception was constructed with a single argument, and in
101       # this case, err.message == err.args[0] == str(err)
102       return (False, str(err))
103     except errors.QuitGanetiException, err:
104       # Tell parent to quit
105       logging.info("Shutting down the node daemon, arguments: %s",
106                    str(err.args))
107       os.kill(self.noded_pid, signal.SIGTERM)
108       # And return the error's arguments, which must be already in
109       # correct tuple format
110       return err.args
111     except Exception, err:
112       logging.exception("Error in RPC call")
113       return False, "Error while executing backend function: %s" % str(err)
114
115   # the new block devices  --------------------------
116
117   @staticmethod
118   def perspective_blockdev_create(params):
119     """Create a block device.
120
121     """
122     bdev_s, size, owner, on_primary, info = params
123     bdev = objects.Disk.FromDict(bdev_s)
124     if bdev is None:
125       raise ValueError("can't unserialize data!")
126     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
127
128   @staticmethod
129   def perspective_blockdev_remove(params):
130     """Remove a block device.
131
132     """
133     bdev_s = params[0]
134     bdev = objects.Disk.FromDict(bdev_s)
135     return backend.BlockdevRemove(bdev)
136
137   @staticmethod
138   def perspective_blockdev_rename(params):
139     """Remove a block device.
140
141     """
142     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
143     return backend.BlockdevRename(devlist)
144
145   @staticmethod
146   def perspective_blockdev_assemble(params):
147     """Assemble a block device.
148
149     """
150     bdev_s, owner, on_primary = params
151     bdev = objects.Disk.FromDict(bdev_s)
152     if bdev is None:
153       raise ValueError("can't unserialize data!")
154     return backend.BlockdevAssemble(bdev, owner, on_primary)
155
156   @staticmethod
157   def perspective_blockdev_shutdown(params):
158     """Shutdown a block device.
159
160     """
161     bdev_s = params[0]
162     bdev = objects.Disk.FromDict(bdev_s)
163     if bdev is None:
164       raise ValueError("can't unserialize data!")
165     return backend.BlockdevShutdown(bdev)
166
167   @staticmethod
168   def perspective_blockdev_addchildren(params):
169     """Add a child to a mirror device.
170
171     Note: this is only valid for mirror devices. It's the caller's duty
172     to send a correct disk, otherwise we raise an error.
173
174     """
175     bdev_s, ndev_s = params
176     bdev = objects.Disk.FromDict(bdev_s)
177     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
178     if bdev is None or ndevs.count(None) > 0:
179       raise ValueError("can't unserialize data!")
180     return backend.BlockdevAddchildren(bdev, ndevs)
181
182   @staticmethod
183   def perspective_blockdev_removechildren(params):
184     """Remove a child from a mirror device.
185
186     This is only valid for mirror devices, of course. It's the callers
187     duty to send a correct disk, otherwise we raise an error.
188
189     """
190     bdev_s, ndev_s = params
191     bdev = objects.Disk.FromDict(bdev_s)
192     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
193     if bdev is None or ndevs.count(None) > 0:
194       raise ValueError("can't unserialize data!")
195     return backend.BlockdevRemovechildren(bdev, ndevs)
196
197   @staticmethod
198   def perspective_blockdev_getmirrorstatus(params):
199     """Return the mirror status for a list of disks.
200
201     """
202     disks = [objects.Disk.FromDict(dsk_s)
203              for dsk_s in params]
204     return [status.ToDict()
205             for status in backend.BlockdevGetmirrorstatus(disks)]
206
207   @staticmethod
208   def perspective_blockdev_find(params):
209     """Expose the FindBlockDevice functionality for a disk.
210
211     This will try to find but not activate a disk.
212
213     """
214     disk = objects.Disk.FromDict(params[0])
215
216     result = backend.BlockdevFind(disk)
217     if result is None:
218       return None
219
220     return result.ToDict()
221
222   @staticmethod
223   def perspective_blockdev_snapshot(params):
224     """Create a snapshot device.
225
226     Note that this is only valid for LVM disks, if we get passed
227     something else we raise an exception. The snapshot device can be
228     remove by calling the generic block device remove call.
229
230     """
231     cfbd = objects.Disk.FromDict(params[0])
232     return backend.BlockdevSnapshot(cfbd)
233
234   @staticmethod
235   def perspective_blockdev_grow(params):
236     """Grow a stack of devices.
237
238     """
239     cfbd = objects.Disk.FromDict(params[0])
240     amount = params[1]
241     return backend.BlockdevGrow(cfbd, amount)
242
243   @staticmethod
244   def perspective_blockdev_close(params):
245     """Closes the given block devices.
246
247     """
248     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
249     return backend.BlockdevClose(params[0], disks)
250
251   @staticmethod
252   def perspective_blockdev_getsize(params):
253     """Compute the sizes of the given block devices.
254
255     """
256     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
257     return backend.BlockdevGetsize(disks)
258
259   # blockdev/drbd specific methods ----------
260
261   @staticmethod
262   def perspective_drbd_disconnect_net(params):
263     """Disconnects the network connection of drbd disks.
264
265     Note that this is only valid for drbd disks, so the members of the
266     disk list must all be drbd devices.
267
268     """
269     nodes_ip, disks = params
270     disks = [objects.Disk.FromDict(cf) for cf in disks]
271     return backend.DrbdDisconnectNet(nodes_ip, disks)
272
273   @staticmethod
274   def perspective_drbd_attach_net(params):
275     """Attaches the network connection of drbd disks.
276
277     Note that this is only valid for drbd disks, so the members of the
278     disk list must all be drbd devices.
279
280     """
281     nodes_ip, disks, instance_name, multimaster = params
282     disks = [objects.Disk.FromDict(cf) for cf in disks]
283     return backend.DrbdAttachNet(nodes_ip, disks,
284                                      instance_name, multimaster)
285
286   @staticmethod
287   def perspective_drbd_wait_sync(params):
288     """Wait until DRBD disks are synched.
289
290     Note that this is only valid for drbd disks, so the members of the
291     disk list must all be drbd devices.
292
293     """
294     nodes_ip, disks = params
295     disks = [objects.Disk.FromDict(cf) for cf in disks]
296     return backend.DrbdWaitSync(nodes_ip, disks)
297
298   # export/import  --------------------------
299
300   @staticmethod
301   def perspective_snapshot_export(params):
302     """Export a given snapshot.
303
304     """
305     disk = objects.Disk.FromDict(params[0])
306     dest_node = params[1]
307     instance = objects.Instance.FromDict(params[2])
308     cluster_name = params[3]
309     dev_idx = params[4]
310     return backend.ExportSnapshot(disk, dest_node, instance,
311                                   cluster_name, dev_idx)
312
313   @staticmethod
314   def perspective_finalize_export(params):
315     """Expose the finalize export functionality.
316
317     """
318     instance = objects.Instance.FromDict(params[0])
319     snap_disks = [objects.Disk.FromDict(str_data)
320                   for str_data in params[1]]
321     return backend.FinalizeExport(instance, snap_disks)
322
323   @staticmethod
324   def perspective_export_info(params):
325     """Query information about an existing export on this node.
326
327     The given path may not contain an export, in which case we return
328     None.
329
330     """
331     path = params[0]
332     return backend.ExportInfo(path)
333
334   @staticmethod
335   def perspective_export_list(params):
336     """List the available exports on this node.
337
338     Note that as opposed to export_info, which may query data about an
339     export in any path, this only queries the standard Ganeti path
340     (constants.EXPORT_DIR).
341
342     """
343     return backend.ListExports()
344
345   @staticmethod
346   def perspective_export_remove(params):
347     """Remove an export.
348
349     """
350     export = params[0]
351     return backend.RemoveExport(export)
352
353   # volume  --------------------------
354
355   @staticmethod
356   def perspective_lv_list(params):
357     """Query the list of logical volumes in a given volume group.
358
359     """
360     vgname = params[0]
361     return backend.GetVolumeList(vgname)
362
363   @staticmethod
364   def perspective_vg_list(params):
365     """Query the list of volume groups.
366
367     """
368     return backend.ListVolumeGroups()
369
370   # Storage --------------------------
371
372   @staticmethod
373   def perspective_storage_list(params):
374     """Get list of storage units.
375
376     """
377     (su_name, su_args, name, fields) = params
378     return storage.GetStorage(su_name, *su_args).List(name, fields)
379
380   @staticmethod
381   def perspective_storage_modify(params):
382     """Modify a storage unit.
383
384     """
385     (su_name, su_args, name, changes) = params
386     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
387
388   # bridge  --------------------------
389
390   @staticmethod
391   def perspective_bridges_exist(params):
392     """Check if all bridges given exist on this node.
393
394     """
395     bridges_list = params[0]
396     return backend.BridgesExist(bridges_list)
397
398   # instance  --------------------------
399
400   @staticmethod
401   def perspective_instance_os_add(params):
402     """Install an OS on a given instance.
403
404     """
405     inst_s = params[0]
406     inst = objects.Instance.FromDict(inst_s)
407     reinstall = params[1]
408     return backend.InstanceOsAdd(inst, reinstall)
409
410   @staticmethod
411   def perspective_instance_run_rename(params):
412     """Runs the OS rename script for an instance.
413
414     """
415     inst_s, old_name = params
416     inst = objects.Instance.FromDict(inst_s)
417     return backend.RunRenameInstance(inst, old_name)
418
419   @staticmethod
420   def perspective_instance_os_import(params):
421     """Run the import function of an OS onto a given instance.
422
423     """
424     inst_s, src_node, src_images, cluster_name = params
425     inst = objects.Instance.FromDict(inst_s)
426     return backend.ImportOSIntoInstance(inst, src_node, src_images,
427                                         cluster_name)
428
429   @staticmethod
430   def perspective_instance_shutdown(params):
431     """Shutdown an instance.
432
433     """
434     instance = objects.Instance.FromDict(params[0])
435     return backend.InstanceShutdown(instance)
436
437   @staticmethod
438   def perspective_instance_start(params):
439     """Start an instance.
440
441     """
442     instance = objects.Instance.FromDict(params[0])
443     return backend.StartInstance(instance)
444
445   @staticmethod
446   def perspective_migration_info(params):
447     """Gather information about an instance to be migrated.
448
449     """
450     instance = objects.Instance.FromDict(params[0])
451     return backend.MigrationInfo(instance)
452
453   @staticmethod
454   def perspective_accept_instance(params):
455     """Prepare the node to accept an instance.
456
457     """
458     instance, info, target = params
459     instance = objects.Instance.FromDict(instance)
460     return backend.AcceptInstance(instance, info, target)
461
462   @staticmethod
463   def perspective_finalize_migration(params):
464     """Finalize the instance migration.
465
466     """
467     instance, info, success = params
468     instance = objects.Instance.FromDict(instance)
469     return backend.FinalizeMigration(instance, info, success)
470
471   @staticmethod
472   def perspective_instance_migrate(params):
473     """Migrates an instance.
474
475     """
476     instance, target, live = params
477     instance = objects.Instance.FromDict(instance)
478     return backend.MigrateInstance(instance, target, live)
479
480   @staticmethod
481   def perspective_instance_reboot(params):
482     """Reboot an instance.
483
484     """
485     instance = objects.Instance.FromDict(params[0])
486     reboot_type = params[1]
487     return backend.InstanceReboot(instance, reboot_type)
488
489   @staticmethod
490   def perspective_instance_info(params):
491     """Query instance information.
492
493     """
494     return backend.GetInstanceInfo(params[0], params[1])
495
496   @staticmethod
497   def perspective_instance_migratable(params):
498     """Query whether the specified instance can be migrated.
499
500     """
501     instance = objects.Instance.FromDict(params[0])
502     return backend.GetInstanceMigratable(instance)
503
504   @staticmethod
505   def perspective_all_instances_info(params):
506     """Query information about all instances.
507
508     """
509     return backend.GetAllInstancesInfo(params[0])
510
511   @staticmethod
512   def perspective_instance_list(params):
513     """Query the list of running instances.
514
515     """
516     return backend.GetInstanceList(params[0])
517
518   # node --------------------------
519
520   @staticmethod
521   def perspective_node_tcp_ping(params):
522     """Do a TcpPing on the remote node.
523
524     """
525     return utils.TcpPing(params[1], params[2], timeout=params[3],
526                          live_port_needed=params[4], source=params[0])
527
528   @staticmethod
529   def perspective_node_has_ip_address(params):
530     """Checks if a node has the given ip address.
531
532     """
533     return utils.OwnIpAddress(params[0])
534
535   @staticmethod
536   def perspective_node_info(params):
537     """Query node information.
538
539     """
540     vgname, hypervisor_type = params
541     return backend.GetNodeInfo(vgname, hypervisor_type)
542
543   @staticmethod
544   def perspective_node_add(params):
545     """Complete the registration of this node in the cluster.
546
547     """
548     return backend.AddNode(params[0], params[1], params[2],
549                            params[3], params[4], params[5])
550
551   @staticmethod
552   def perspective_node_verify(params):
553     """Run a verify sequence on this node.
554
555     """
556     return backend.VerifyNode(params[0], params[1])
557
558   @staticmethod
559   def perspective_node_start_master(params):
560     """Promote this node to master status.
561
562     """
563     return backend.StartMaster(params[0], params[1])
564
565   @staticmethod
566   def perspective_node_stop_master(params):
567     """Demote this node from master status.
568
569     """
570     return backend.StopMaster(params[0])
571
572   @staticmethod
573   def perspective_node_leave_cluster(params):
574     """Cleanup after leaving a cluster.
575
576     """
577     return backend.LeaveCluster()
578
579   @staticmethod
580   def perspective_node_volumes(params):
581     """Query the list of all logical volume groups.
582
583     """
584     return backend.NodeVolumes()
585
586   @staticmethod
587   def perspective_node_demote_from_mc(params):
588     """Demote a node from the master candidate role.
589
590     """
591     return backend.DemoteFromMC()
592
593
594   @staticmethod
595   def perspective_node_powercycle(params):
596     """Tries to powercycle the nod.
597
598     """
599     hypervisor_type = params[0]
600     return backend.PowercycleNode(hypervisor_type)
601
602
603   # cluster --------------------------
604
605   @staticmethod
606   def perspective_version(params):
607     """Query version information.
608
609     """
610     return constants.PROTOCOL_VERSION
611
612   @staticmethod
613   def perspective_upload_file(params):
614     """Upload a file.
615
616     Note that the backend implementation imposes strict rules on which
617     files are accepted.
618
619     """
620     return backend.UploadFile(*params)
621
622   @staticmethod
623   def perspective_master_info(params):
624     """Query master information.
625
626     """
627     return backend.GetMasterInfo()
628
629   @staticmethod
630   def perspective_write_ssconf_files(params):
631     """Write ssconf files.
632
633     """
634     (values,) = params
635     return backend.WriteSsconfFiles(values)
636
637   # os -----------------------
638
639   @staticmethod
640   def perspective_os_diagnose(params):
641     """Query detailed information about existing OSes.
642
643     """
644     return backend.DiagnoseOS()
645
646   @staticmethod
647   def perspective_os_get(params):
648     """Query information about a given OS.
649
650     """
651     name = params[0]
652     os_obj = backend.OSFromDisk(name)
653     return os_obj.ToDict()
654
655   # hooks -----------------------
656
657   @staticmethod
658   def perspective_hooks_runner(params):
659     """Run hook scripts.
660
661     """
662     hpath, phase, env = params
663     hr = backend.HooksRunner()
664     return hr.RunHooks(hpath, phase, env)
665
666   # iallocator -----------------
667
668   @staticmethod
669   def perspective_iallocator_runner(params):
670     """Run an iallocator script.
671
672     """
673     name, idata = params
674     iar = backend.IAllocatorRunner()
675     return iar.Run(name, idata)
676
677   # test -----------------------
678
679   @staticmethod
680   def perspective_test_delay(params):
681     """Run test delay.
682
683     """
684     duration = params[0]
685     status, rval = utils.TestDelay(duration)
686     if not status:
687       raise backend.RPCFail(rval)
688     return rval
689
690   # file storage ---------------
691
692   @staticmethod
693   def perspective_file_storage_dir_create(params):
694     """Create the file storage directory.
695
696     """
697     file_storage_dir = params[0]
698     return backend.CreateFileStorageDir(file_storage_dir)
699
700   @staticmethod
701   def perspective_file_storage_dir_remove(params):
702     """Remove the file storage directory.
703
704     """
705     file_storage_dir = params[0]
706     return backend.RemoveFileStorageDir(file_storage_dir)
707
708   @staticmethod
709   def perspective_file_storage_dir_rename(params):
710     """Rename the file storage directory.
711
712     """
713     old_file_storage_dir = params[0]
714     new_file_storage_dir = params[1]
715     return backend.RenameFileStorageDir(old_file_storage_dir,
716                                         new_file_storage_dir)
717
718   # jobs ------------------------
719
720   @staticmethod
721   @_RequireJobQueueLock
722   def perspective_jobqueue_update(params):
723     """Update job queue.
724
725     """
726     (file_name, content) = params
727     return backend.JobQueueUpdate(file_name, content)
728
729   @staticmethod
730   @_RequireJobQueueLock
731   def perspective_jobqueue_purge(params):
732     """Purge job queue.
733
734     """
735     return backend.JobQueuePurge()
736
737   @staticmethod
738   @_RequireJobQueueLock
739   def perspective_jobqueue_rename(params):
740     """Rename a job queue file.
741
742     """
743     # TODO: What if a file fails to rename?
744     return [backend.JobQueueRename(old, new) for old, new in params]
745
746   @staticmethod
747   def perspective_jobqueue_set_drain(params):
748     """Set/unset the queue drain flag.
749
750     """
751     drain_flag = params[0]
752     return backend.JobQueueSetDrainFlag(drain_flag)
753
754
755   # hypervisor ---------------
756
757   @staticmethod
758   def perspective_hypervisor_validate_params(params):
759     """Validate the hypervisor parameters.
760
761     """
762     (hvname, hvparams) = params
763     return backend.ValidateHVParams(hvname, hvparams)
764
765
766 def ExecNODED(options, args):
767   """Main NODED function, executed with the pidfile held.
768
769   """
770   global queue_lock
771
772   # Read SSL certificate
773   if options.ssl:
774     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
775                                     ssl_cert_path=options.ssl_cert)
776   else:
777     ssl_params = None
778
779   # Prepare job queue
780   queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
781
782   mainloop = daemon.Mainloop()
783   server = NodeHttpServer(mainloop, options.bind_address, options.port,
784                           ssl_params=ssl_params, ssl_verify_peer=True)
785   server.Start()
786   try:
787     mainloop.Run()
788   finally:
789     server.Stop()
790
791
792 def main():
793   """Main function for the node daemon.
794
795   """
796   parser = OptionParser(description="Ganeti node daemon",
797                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
798                         version="%%prog (ganeti) %s" %
799                         constants.RELEASE_VERSION)
800   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
801   dirs.append((constants.LOG_OS_DIR, 0750))
802   dirs.append((constants.LOCK_DIR, 1777))
803   daemon.GenericMain(constants.NODED, parser, dirs, None, ExecNODED)
804
805
806 if __name__ == '__main__':
807   main()