Ship default kvm-ifup script
[ganeti-local] / lib / server / noded.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable-msg=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36
37 from optparse import OptionParser
38
39 from ganeti import backend
40 from ganeti import constants
41 from ganeti import objects
42 from ganeti import errors
43 from ganeti import jstore
44 from ganeti import daemon
45 from ganeti import http
46 from ganeti import utils
47 from ganeti import storage
48 from ganeti import serializer
49 from ganeti import netutils
50
51 import ganeti.http.server # pylint: disable-msg=W0611
52
53
54 queue_lock = None
55
56
57 def _PrepareQueueLock():
58   """Try to prepare the queue lock.
59
60   @return: None for success, otherwise an exception object
61
62   """
63   global queue_lock # pylint: disable-msg=W0603
64
65   if queue_lock is not None:
66     return None
67
68   # Prepare job queue
69   try:
70     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
71     return None
72   except EnvironmentError, err:
73     return err
74
75
76 def _RequireJobQueueLock(fn):
77   """Decorator for job queue manipulating functions.
78
79   """
80   QUEUE_LOCK_TIMEOUT = 10
81
82   def wrapper(*args, **kwargs):
83     # Locking in exclusive, blocking mode because there could be several
84     # children running at the same time. Waiting up to 10 seconds.
85     if _PrepareQueueLock() is not None:
86       raise errors.JobQueueError("Job queue failed initialization,"
87                                  " cannot update jobs")
88     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
89     try:
90       return fn(*args, **kwargs)
91     finally:
92       queue_lock.Unlock()
93
94   return wrapper
95
96
97 def _DecodeImportExportIO(ieio, ieioargs):
98   """Decodes import/export I/O information.
99
100   """
101   if ieio == constants.IEIO_RAW_DISK:
102     assert len(ieioargs) == 1
103     return (objects.Disk.FromDict(ieioargs[0]), )
104
105   if ieio == constants.IEIO_SCRIPT:
106     assert len(ieioargs) == 2
107     return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
108
109   return ieioargs
110
111
112 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
113   """Custom Request Executor class that ensures NodeHttpServer children are
114   locked in ram.
115
116   """
117   def __init__(self, *args, **kwargs):
118     utils.Mlockall()
119
120     http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
121
122
123 class NodeHttpServer(http.server.HttpServer):
124   """The server implementation.
125
126   This class holds all methods exposed over the RPC interface.
127
128   """
129   # too many public methods, and unused args - all methods get params
130   # due to the API
131   # pylint: disable-msg=R0904,W0613
132   def __init__(self, *args, **kwargs):
133     http.server.HttpServer.__init__(self, *args, **kwargs)
134     self.noded_pid = os.getpid()
135
136   def HandleRequest(self, req):
137     """Handle a request.
138
139     """
140     if req.request_method.upper() != http.HTTP_PUT:
141       raise http.HttpBadRequest()
142
143     path = req.request_path
144     if path.startswith("/"):
145       path = path[1:]
146
147     method = getattr(self, "perspective_%s" % path, None)
148     if method is None:
149       raise http.HttpNotFound()
150
151     try:
152       result = (True, method(serializer.LoadJson(req.request_body)))
153
154     except backend.RPCFail, err:
155       # our custom failure exception; str(err) works fine if the
156       # exception was constructed with a single argument, and in
157       # this case, err.message == err.args[0] == str(err)
158       result = (False, str(err))
159     except errors.QuitGanetiException, err:
160       # Tell parent to quit
161       logging.info("Shutting down the node daemon, arguments: %s",
162                    str(err.args))
163       os.kill(self.noded_pid, signal.SIGTERM)
164       # And return the error's arguments, which must be already in
165       # correct tuple format
166       result = err.args
167     except Exception, err:
168       logging.exception("Error in RPC call")
169       result = (False, "Error while executing backend function: %s" % str(err))
170
171     return serializer.DumpJson(result, indent=False)
172
173   # the new block devices  --------------------------
174
175   @staticmethod
176   def perspective_blockdev_create(params):
177     """Create a block device.
178
179     """
180     bdev_s, size, owner, on_primary, info = params
181     bdev = objects.Disk.FromDict(bdev_s)
182     if bdev is None:
183       raise ValueError("can't unserialize data!")
184     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
185
186   @staticmethod
187   def perspective_blockdev_pause_resume_sync(params):
188     """Pause/resume sync of a block device.
189
190     """
191     disks_s, pause = params
192     disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
193     return backend.BlockdevPauseResumeSync(disks, pause)
194
195   @staticmethod
196   def perspective_blockdev_wipe(params):
197     """Wipe a block device.
198
199     """
200     bdev_s, offset, size = params
201     bdev = objects.Disk.FromDict(bdev_s)
202     return backend.BlockdevWipe(bdev, offset, size)
203
204   @staticmethod
205   def perspective_blockdev_remove(params):
206     """Remove a block device.
207
208     """
209     bdev_s = params[0]
210     bdev = objects.Disk.FromDict(bdev_s)
211     return backend.BlockdevRemove(bdev)
212
213   @staticmethod
214   def perspective_blockdev_rename(params):
215     """Remove a block device.
216
217     """
218     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
219     return backend.BlockdevRename(devlist)
220
221   @staticmethod
222   def perspective_blockdev_assemble(params):
223     """Assemble a block device.
224
225     """
226     bdev_s, owner, on_primary = params
227     bdev = objects.Disk.FromDict(bdev_s)
228     if bdev is None:
229       raise ValueError("can't unserialize data!")
230     return backend.BlockdevAssemble(bdev, owner, on_primary)
231
232   @staticmethod
233   def perspective_blockdev_shutdown(params):
234     """Shutdown a block device.
235
236     """
237     bdev_s = params[0]
238     bdev = objects.Disk.FromDict(bdev_s)
239     if bdev is None:
240       raise ValueError("can't unserialize data!")
241     return backend.BlockdevShutdown(bdev)
242
243   @staticmethod
244   def perspective_blockdev_addchildren(params):
245     """Add a child to a mirror device.
246
247     Note: this is only valid for mirror devices. It's the caller's duty
248     to send a correct disk, otherwise we raise an error.
249
250     """
251     bdev_s, ndev_s = params
252     bdev = objects.Disk.FromDict(bdev_s)
253     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
254     if bdev is None or ndevs.count(None) > 0:
255       raise ValueError("can't unserialize data!")
256     return backend.BlockdevAddchildren(bdev, ndevs)
257
258   @staticmethod
259   def perspective_blockdev_removechildren(params):
260     """Remove a child from a mirror device.
261
262     This is only valid for mirror devices, of course. It's the callers
263     duty to send a correct disk, otherwise we raise an error.
264
265     """
266     bdev_s, ndev_s = params
267     bdev = objects.Disk.FromDict(bdev_s)
268     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
269     if bdev is None or ndevs.count(None) > 0:
270       raise ValueError("can't unserialize data!")
271     return backend.BlockdevRemovechildren(bdev, ndevs)
272
273   @staticmethod
274   def perspective_blockdev_getmirrorstatus(params):
275     """Return the mirror status for a list of disks.
276
277     """
278     disks = [objects.Disk.FromDict(dsk_s)
279              for dsk_s in params]
280     return [status.ToDict()
281             for status in backend.BlockdevGetmirrorstatus(disks)]
282
283   @staticmethod
284   def perspective_blockdev_getmirrorstatus_multi(params):
285     """Return the mirror status for a list of disks.
286
287     """
288     (node_disks, ) = params
289
290     node_name = netutils.Hostname.GetSysName()
291
292     disks = [objects.Disk.FromDict(dsk_s)
293              for dsk_s in node_disks.get(node_name, [])]
294
295     result = []
296
297     for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
298       if success:
299         result.append((success, status.ToDict()))
300       else:
301         result.append((success, status))
302
303     return result
304
305   @staticmethod
306   def perspective_blockdev_find(params):
307     """Expose the FindBlockDevice functionality for a disk.
308
309     This will try to find but not activate a disk.
310
311     """
312     disk = objects.Disk.FromDict(params[0])
313
314     result = backend.BlockdevFind(disk)
315     if result is None:
316       return None
317
318     return result.ToDict()
319
320   @staticmethod
321   def perspective_blockdev_snapshot(params):
322     """Create a snapshot device.
323
324     Note that this is only valid for LVM disks, if we get passed
325     something else we raise an exception. The snapshot device can be
326     remove by calling the generic block device remove call.
327
328     """
329     cfbd = objects.Disk.FromDict(params[0])
330     return backend.BlockdevSnapshot(cfbd)
331
332   @staticmethod
333   def perspective_blockdev_grow(params):
334     """Grow a stack of devices.
335
336     """
337     cfbd = objects.Disk.FromDict(params[0])
338     amount = params[1]
339     return backend.BlockdevGrow(cfbd, amount)
340
341   @staticmethod
342   def perspective_blockdev_close(params):
343     """Closes the given block devices.
344
345     """
346     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
347     return backend.BlockdevClose(params[0], disks)
348
349   @staticmethod
350   def perspective_blockdev_getsize(params):
351     """Compute the sizes of the given block devices.
352
353     """
354     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
355     return backend.BlockdevGetsize(disks)
356
357   @staticmethod
358   def perspective_blockdev_export(params):
359     """Compute the sizes of the given block devices.
360
361     """
362     disk = objects.Disk.FromDict(params[0])
363     dest_node, dest_path, cluster_name = params[1:]
364     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
365
366   # blockdev/drbd specific methods ----------
367
368   @staticmethod
369   def perspective_drbd_disconnect_net(params):
370     """Disconnects the network connection of drbd disks.
371
372     Note that this is only valid for drbd disks, so the members of the
373     disk list must all be drbd devices.
374
375     """
376     nodes_ip, disks = params
377     disks = [objects.Disk.FromDict(cf) for cf in disks]
378     return backend.DrbdDisconnectNet(nodes_ip, disks)
379
380   @staticmethod
381   def perspective_drbd_attach_net(params):
382     """Attaches the network connection of drbd disks.
383
384     Note that this is only valid for drbd disks, so the members of the
385     disk list must all be drbd devices.
386
387     """
388     nodes_ip, disks, instance_name, multimaster = params
389     disks = [objects.Disk.FromDict(cf) for cf in disks]
390     return backend.DrbdAttachNet(nodes_ip, disks,
391                                      instance_name, multimaster)
392
393   @staticmethod
394   def perspective_drbd_wait_sync(params):
395     """Wait until DRBD disks are synched.
396
397     Note that this is only valid for drbd disks, so the members of the
398     disk list must all be drbd devices.
399
400     """
401     nodes_ip, disks = params
402     disks = [objects.Disk.FromDict(cf) for cf in disks]
403     return backend.DrbdWaitSync(nodes_ip, disks)
404
405   @staticmethod
406   def perspective_drbd_helper(params):
407     """Query drbd helper.
408
409     """
410     return backend.GetDrbdUsermodeHelper()
411
412   # export/import  --------------------------
413
414   @staticmethod
415   def perspective_finalize_export(params):
416     """Expose the finalize export functionality.
417
418     """
419     instance = objects.Instance.FromDict(params[0])
420
421     snap_disks = []
422     for disk in params[1]:
423       if isinstance(disk, bool):
424         snap_disks.append(disk)
425       else:
426         snap_disks.append(objects.Disk.FromDict(disk))
427
428     return backend.FinalizeExport(instance, snap_disks)
429
430   @staticmethod
431   def perspective_export_info(params):
432     """Query information about an existing export on this node.
433
434     The given path may not contain an export, in which case we return
435     None.
436
437     """
438     path = params[0]
439     return backend.ExportInfo(path)
440
441   @staticmethod
442   def perspective_export_list(params):
443     """List the available exports on this node.
444
445     Note that as opposed to export_info, which may query data about an
446     export in any path, this only queries the standard Ganeti path
447     (constants.EXPORT_DIR).
448
449     """
450     return backend.ListExports()
451
452   @staticmethod
453   def perspective_export_remove(params):
454     """Remove an export.
455
456     """
457     export = params[0]
458     return backend.RemoveExport(export)
459
460   # volume  --------------------------
461
462   @staticmethod
463   def perspective_lv_list(params):
464     """Query the list of logical volumes in a given volume group.
465
466     """
467     vgname = params[0]
468     return backend.GetVolumeList(vgname)
469
470   @staticmethod
471   def perspective_vg_list(params):
472     """Query the list of volume groups.
473
474     """
475     return backend.ListVolumeGroups()
476
477   # Storage --------------------------
478
479   @staticmethod
480   def perspective_storage_list(params):
481     """Get list of storage units.
482
483     """
484     (su_name, su_args, name, fields) = params
485     return storage.GetStorage(su_name, *su_args).List(name, fields)
486
487   @staticmethod
488   def perspective_storage_modify(params):
489     """Modify a storage unit.
490
491     """
492     (su_name, su_args, name, changes) = params
493     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
494
495   @staticmethod
496   def perspective_storage_execute(params):
497     """Execute an operation on a storage unit.
498
499     """
500     (su_name, su_args, name, op) = params
501     return storage.GetStorage(su_name, *su_args).Execute(name, op)
502
503   # bridge  --------------------------
504
505   @staticmethod
506   def perspective_bridges_exist(params):
507     """Check if all bridges given exist on this node.
508
509     """
510     bridges_list = params[0]
511     return backend.BridgesExist(bridges_list)
512
513   # instance  --------------------------
514
515   @staticmethod
516   def perspective_instance_os_add(params):
517     """Install an OS on a given instance.
518
519     """
520     inst_s = params[0]
521     inst = objects.Instance.FromDict(inst_s)
522     reinstall = params[1]
523     debug = params[2]
524     return backend.InstanceOsAdd(inst, reinstall, debug)
525
526   @staticmethod
527   def perspective_instance_run_rename(params):
528     """Runs the OS rename script for an instance.
529
530     """
531     inst_s, old_name, debug = params
532     inst = objects.Instance.FromDict(inst_s)
533     return backend.RunRenameInstance(inst, old_name, debug)
534
535   @staticmethod
536   def perspective_instance_shutdown(params):
537     """Shutdown an instance.
538
539     """
540     instance = objects.Instance.FromDict(params[0])
541     timeout = params[1]
542     return backend.InstanceShutdown(instance, timeout)
543
544   @staticmethod
545   def perspective_instance_start(params):
546     """Start an instance.
547
548     """
549     instance = objects.Instance.FromDict(params[0])
550     return backend.StartInstance(instance)
551
552   @staticmethod
553   def perspective_migration_info(params):
554     """Gather information about an instance to be migrated.
555
556     """
557     instance = objects.Instance.FromDict(params[0])
558     return backend.MigrationInfo(instance)
559
560   @staticmethod
561   def perspective_accept_instance(params):
562     """Prepare the node to accept an instance.
563
564     """
565     instance, info, target = params
566     instance = objects.Instance.FromDict(instance)
567     return backend.AcceptInstance(instance, info, target)
568
569   @staticmethod
570   def perspective_finalize_migration(params):
571     """Finalize the instance migration.
572
573     """
574     instance, info, success = params
575     instance = objects.Instance.FromDict(instance)
576     return backend.FinalizeMigration(instance, info, success)
577
578   @staticmethod
579   def perspective_instance_migrate(params):
580     """Migrates an instance.
581
582     """
583     instance, target, live = params
584     instance = objects.Instance.FromDict(instance)
585     return backend.MigrateInstance(instance, target, live)
586
587   @staticmethod
588   def perspective_instance_reboot(params):
589     """Reboot an instance.
590
591     """
592     instance = objects.Instance.FromDict(params[0])
593     reboot_type = params[1]
594     shutdown_timeout = params[2]
595     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
596
597   @staticmethod
598   def perspective_instance_info(params):
599     """Query instance information.
600
601     """
602     return backend.GetInstanceInfo(params[0], params[1])
603
604   @staticmethod
605   def perspective_instance_migratable(params):
606     """Query whether the specified instance can be migrated.
607
608     """
609     instance = objects.Instance.FromDict(params[0])
610     return backend.GetInstanceMigratable(instance)
611
612   @staticmethod
613   def perspective_all_instances_info(params):
614     """Query information about all instances.
615
616     """
617     return backend.GetAllInstancesInfo(params[0])
618
619   @staticmethod
620   def perspective_instance_list(params):
621     """Query the list of running instances.
622
623     """
624     return backend.GetInstanceList(params[0])
625
626   # node --------------------------
627
628   @staticmethod
629   def perspective_node_tcp_ping(params):
630     """Do a TcpPing on the remote node.
631
632     """
633     return netutils.TcpPing(params[1], params[2], timeout=params[3],
634                             live_port_needed=params[4], source=params[0])
635
636   @staticmethod
637   def perspective_node_has_ip_address(params):
638     """Checks if a node has the given ip address.
639
640     """
641     return netutils.IPAddress.Own(params[0])
642
643   @staticmethod
644   def perspective_node_info(params):
645     """Query node information.
646
647     """
648     vgname, hypervisor_type = params
649     return backend.GetNodeInfo(vgname, hypervisor_type)
650
651   @staticmethod
652   def perspective_etc_hosts_modify(params):
653     """Modify a node entry in /etc/hosts.
654
655     """
656     backend.EtcHostsModify(params[0], params[1], params[2])
657
658     return True
659
660   @staticmethod
661   def perspective_node_verify(params):
662     """Run a verify sequence on this node.
663
664     """
665     return backend.VerifyNode(params[0], params[1])
666
667   @staticmethod
668   def perspective_node_start_master(params):
669     """Promote this node to master status.
670
671     """
672     return backend.StartMaster(params[0], params[1])
673
674   @staticmethod
675   def perspective_node_stop_master(params):
676     """Demote this node from master status.
677
678     """
679     return backend.StopMaster(params[0])
680
681   @staticmethod
682   def perspective_node_leave_cluster(params):
683     """Cleanup after leaving a cluster.
684
685     """
686     return backend.LeaveCluster(params[0])
687
688   @staticmethod
689   def perspective_node_volumes(params):
690     """Query the list of all logical volume groups.
691
692     """
693     return backend.NodeVolumes()
694
695   @staticmethod
696   def perspective_node_demote_from_mc(params):
697     """Demote a node from the master candidate role.
698
699     """
700     return backend.DemoteFromMC()
701
702
703   @staticmethod
704   def perspective_node_powercycle(params):
705     """Tries to powercycle the nod.
706
707     """
708     hypervisor_type = params[0]
709     return backend.PowercycleNode(hypervisor_type)
710
711
712   # cluster --------------------------
713
714   @staticmethod
715   def perspective_version(params):
716     """Query version information.
717
718     """
719     return constants.PROTOCOL_VERSION
720
721   @staticmethod
722   def perspective_upload_file(params):
723     """Upload a file.
724
725     Note that the backend implementation imposes strict rules on which
726     files are accepted.
727
728     """
729     return backend.UploadFile(*params)
730
731   @staticmethod
732   def perspective_master_info(params):
733     """Query master information.
734
735     """
736     return backend.GetMasterInfo()
737
738   @staticmethod
739   def perspective_run_oob(params):
740     """Runs oob on node.
741
742     """
743     output = backend.RunOob(params[0], params[1], params[2], params[3])
744     if output:
745       result = serializer.LoadJson(output)
746     else:
747       result = None
748     return result
749
750   @staticmethod
751   def perspective_write_ssconf_files(params):
752     """Write ssconf files.
753
754     """
755     (values,) = params
756     return backend.WriteSsconfFiles(values)
757
758   # os -----------------------
759
760   @staticmethod
761   def perspective_os_diagnose(params):
762     """Query detailed information about existing OSes.
763
764     """
765     return backend.DiagnoseOS()
766
767   @staticmethod
768   def perspective_os_get(params):
769     """Query information about a given OS.
770
771     """
772     name = params[0]
773     os_obj = backend.OSFromDisk(name)
774     return os_obj.ToDict()
775
776   @staticmethod
777   def perspective_os_validate(params):
778     """Run a given OS' validation routine.
779
780     """
781     required, name, checks, params = params
782     return backend.ValidateOS(required, name, checks, params)
783
784   # hooks -----------------------
785
786   @staticmethod
787   def perspective_hooks_runner(params):
788     """Run hook scripts.
789
790     """
791     hpath, phase, env = params
792     hr = backend.HooksRunner()
793     return hr.RunHooks(hpath, phase, env)
794
795   # iallocator -----------------
796
797   @staticmethod
798   def perspective_iallocator_runner(params):
799     """Run an iallocator script.
800
801     """
802     name, idata = params
803     iar = backend.IAllocatorRunner()
804     return iar.Run(name, idata)
805
806   # test -----------------------
807
808   @staticmethod
809   def perspective_test_delay(params):
810     """Run test delay.
811
812     """
813     duration = params[0]
814     status, rval = utils.TestDelay(duration)
815     if not status:
816       raise backend.RPCFail(rval)
817     return rval
818
819   # file storage ---------------
820
821   @staticmethod
822   def perspective_file_storage_dir_create(params):
823     """Create the file storage directory.
824
825     """
826     file_storage_dir = params[0]
827     return backend.CreateFileStorageDir(file_storage_dir)
828
829   @staticmethod
830   def perspective_file_storage_dir_remove(params):
831     """Remove the file storage directory.
832
833     """
834     file_storage_dir = params[0]
835     return backend.RemoveFileStorageDir(file_storage_dir)
836
837   @staticmethod
838   def perspective_file_storage_dir_rename(params):
839     """Rename the file storage directory.
840
841     """
842     old_file_storage_dir = params[0]
843     new_file_storage_dir = params[1]
844     return backend.RenameFileStorageDir(old_file_storage_dir,
845                                         new_file_storage_dir)
846
847   # jobs ------------------------
848
849   @staticmethod
850   @_RequireJobQueueLock
851   def perspective_jobqueue_update(params):
852     """Update job queue.
853
854     """
855     (file_name, content) = params
856     return backend.JobQueueUpdate(file_name, content)
857
858   @staticmethod
859   @_RequireJobQueueLock
860   def perspective_jobqueue_purge(params):
861     """Purge job queue.
862
863     """
864     return backend.JobQueuePurge()
865
866   @staticmethod
867   @_RequireJobQueueLock
868   def perspective_jobqueue_rename(params):
869     """Rename a job queue file.
870
871     """
872     # TODO: What if a file fails to rename?
873     return [backend.JobQueueRename(old, new) for old, new in params]
874
875   # hypervisor ---------------
876
877   @staticmethod
878   def perspective_hypervisor_validate_params(params):
879     """Validate the hypervisor parameters.
880
881     """
882     (hvname, hvparams) = params
883     return backend.ValidateHVParams(hvname, hvparams)
884
885   # Crypto
886
887   @staticmethod
888   def perspective_x509_cert_create(params):
889     """Creates a new X509 certificate for SSL/TLS.
890
891     """
892     (validity, ) = params
893     return backend.CreateX509Certificate(validity)
894
895   @staticmethod
896   def perspective_x509_cert_remove(params):
897     """Removes a X509 certificate.
898
899     """
900     (name, ) = params
901     return backend.RemoveX509Certificate(name)
902
903   # Import and export
904
905   @staticmethod
906   def perspective_import_start(params):
907     """Starts an import daemon.
908
909     """
910     (opts_s, instance, dest, dest_args) = params
911
912     opts = objects.ImportExportOptions.FromDict(opts_s)
913
914     return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
915                                            None, None,
916                                            objects.Instance.FromDict(instance),
917                                            dest,
918                                            _DecodeImportExportIO(dest,
919                                                                  dest_args))
920
921   @staticmethod
922   def perspective_export_start(params):
923     """Starts an export daemon.
924
925     """
926     (opts_s, host, port, instance, source, source_args) = params
927
928     opts = objects.ImportExportOptions.FromDict(opts_s)
929
930     return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
931                                            host, port,
932                                            objects.Instance.FromDict(instance),
933                                            source,
934                                            _DecodeImportExportIO(source,
935                                                                  source_args))
936
937   @staticmethod
938   def perspective_impexp_status(params):
939     """Retrieves the status of an import or export daemon.
940
941     """
942     return backend.GetImportExportStatus(params[0])
943
944   @staticmethod
945   def perspective_impexp_abort(params):
946     """Aborts an import or export.
947
948     """
949     return backend.AbortImportExport(params[0])
950
951   @staticmethod
952   def perspective_impexp_cleanup(params):
953     """Cleans up after an import or export.
954
955     """
956     return backend.CleanupImportExport(params[0])
957
958
959 def CheckNoded(_, args):
960   """Initial checks whether to run or exit with a failure.
961
962   """
963   if args: # noded doesn't take any arguments
964     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
965                           sys.argv[0])
966     sys.exit(constants.EXIT_FAILURE)
967
968
969 def PrepNoded(options, _):
970   """Preparation node daemon function, executed with the PID file held.
971
972   """
973   if options.mlock:
974     request_executor_class = MlockallRequestExecutor
975     try:
976       utils.Mlockall()
977     except errors.NoCtypesError:
978       logging.warning("Cannot set memory lock, ctypes module not found")
979       request_executor_class = http.server.HttpServerRequestExecutor
980   else:
981     request_executor_class = http.server.HttpServerRequestExecutor
982
983   # Read SSL certificate
984   if options.ssl:
985     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
986                                     ssl_cert_path=options.ssl_cert)
987   else:
988     ssl_params = None
989
990   err = _PrepareQueueLock()
991   if err is not None:
992     # this might be some kind of file-system/permission error; while
993     # this breaks the job queue functionality, we shouldn't prevent
994     # startup of the whole node daemon because of this
995     logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
996
997   mainloop = daemon.Mainloop()
998   server = NodeHttpServer(mainloop, options.bind_address, options.port,
999                           ssl_params=ssl_params, ssl_verify_peer=True,
1000                           request_executor_class=request_executor_class)
1001   server.Start()
1002   return (mainloop, server)
1003
1004
1005 def ExecNoded(options, args, prep_data): # pylint: disable-msg=W0613
1006   """Main node daemon function, executed with the PID file held.
1007
1008   """
1009   (mainloop, server) = prep_data
1010   try:
1011     mainloop.Run()
1012   finally:
1013     server.Stop()
1014
1015
1016 def Main():
1017   """Main function for the node daemon.
1018
1019   """
1020   parser = OptionParser(description="Ganeti node daemon",
1021                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
1022                         version="%%prog (ganeti) %s" %
1023                         constants.RELEASE_VERSION)
1024   parser.add_option("--no-mlock", dest="mlock",
1025                     help="Do not mlock the node memory in ram",
1026                     default=True, action="store_false")
1027
1028   daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1029                      default_ssl_cert=constants.NODED_CERT_FILE,
1030                      default_ssl_key=constants.NODED_CERT_FILE,
1031                      console_logging=True)