_BaseCondition: allow saving/restoring state
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable-msg=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36
37 from optparse import OptionParser
38
39 from ganeti import backend
40 from ganeti import constants
41 from ganeti import objects
42 from ganeti import errors
43 from ganeti import jstore
44 from ganeti import daemon
45 from ganeti import http
46 from ganeti import utils
47 from ganeti import storage
48 from ganeti import serializer
49
50 import ganeti.http.server # pylint: disable-msg=W0611
51
52
53 queue_lock = None
54
55
56 def _PrepareQueueLock():
57   """Try to prepare the queue lock.
58
59   @return: None for success, otherwise an exception object
60
61   """
62   global queue_lock # pylint: disable-msg=W0603
63
64   if queue_lock is not None:
65     return None
66
67   # Prepare job queue
68   try:
69     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
70     return None
71   except EnvironmentError, err:
72     return err
73
74
75 def _RequireJobQueueLock(fn):
76   """Decorator for job queue manipulating functions.
77
78   """
79   QUEUE_LOCK_TIMEOUT = 10
80
81   def wrapper(*args, **kwargs):
82     # Locking in exclusive, blocking mode because there could be several
83     # children running at the same time. Waiting up to 10 seconds.
84     if _PrepareQueueLock() is not None:
85       raise errors.JobQueueError("Job queue failed initialization,"
86                                  " cannot update jobs")
87     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
88     try:
89       return fn(*args, **kwargs)
90     finally:
91       queue_lock.Unlock()
92
93   return wrapper
94
95
96 def _DecodeImportExportIO(ieio, ieioargs):
97   """Decodes import/export I/O information.
98
99   """
100   if ieio == constants.IEIO_RAW_DISK:
101     assert len(ieioargs) == 1
102     return (objects.Disk.FromDict(ieioargs[0]), )
103
104   if ieio == constants.IEIO_SCRIPT:
105     assert len(ieioargs) == 2
106     return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
107
108   return ieioargs
109
110
111 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
112   """Custom Request Executor class that ensures NodeHttpServer children are
113   locked in ram.
114
115   """
116   def __init__(self, *args, **kwargs):
117     utils.Mlockall()
118
119     http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
120
121
122 class NodeHttpServer(http.server.HttpServer):
123   """The server implementation.
124
125   This class holds all methods exposed over the RPC interface.
126
127   """
128   # too many public methods, and unused args - all methods get params
129   # due to the API
130   # pylint: disable-msg=R0904,W0613
131   def __init__(self, *args, **kwargs):
132     http.server.HttpServer.__init__(self, *args, **kwargs)
133     self.noded_pid = os.getpid()
134
135   def HandleRequest(self, req):
136     """Handle a request.
137
138     """
139     if req.request_method.upper() != http.HTTP_PUT:
140       raise http.HttpBadRequest()
141
142     path = req.request_path
143     if path.startswith("/"):
144       path = path[1:]
145
146     method = getattr(self, "perspective_%s" % path, None)
147     if method is None:
148       raise http.HttpNotFound()
149
150     try:
151       result = (True, method(serializer.LoadJson(req.request_body)))
152
153     except backend.RPCFail, err:
154       # our custom failure exception; str(err) works fine if the
155       # exception was constructed with a single argument, and in
156       # this case, err.message == err.args[0] == str(err)
157       result = (False, str(err))
158     except errors.QuitGanetiException, err:
159       # Tell parent to quit
160       logging.info("Shutting down the node daemon, arguments: %s",
161                    str(err.args))
162       os.kill(self.noded_pid, signal.SIGTERM)
163       # And return the error's arguments, which must be already in
164       # correct tuple format
165       result = err.args
166     except Exception, err:
167       logging.exception("Error in RPC call")
168       result = (False, "Error while executing backend function: %s" % str(err))
169
170     return serializer.DumpJson(result, indent=False)
171
172   # the new block devices  --------------------------
173
174   @staticmethod
175   def perspective_blockdev_create(params):
176     """Create a block device.
177
178     """
179     bdev_s, size, owner, on_primary, info = params
180     bdev = objects.Disk.FromDict(bdev_s)
181     if bdev is None:
182       raise ValueError("can't unserialize data!")
183     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
184
185   @staticmethod
186   def perspective_blockdev_remove(params):
187     """Remove a block device.
188
189     """
190     bdev_s = params[0]
191     bdev = objects.Disk.FromDict(bdev_s)
192     return backend.BlockdevRemove(bdev)
193
194   @staticmethod
195   def perspective_blockdev_rename(params):
196     """Remove a block device.
197
198     """
199     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
200     return backend.BlockdevRename(devlist)
201
202   @staticmethod
203   def perspective_blockdev_assemble(params):
204     """Assemble a block device.
205
206     """
207     bdev_s, owner, on_primary = params
208     bdev = objects.Disk.FromDict(bdev_s)
209     if bdev is None:
210       raise ValueError("can't unserialize data!")
211     return backend.BlockdevAssemble(bdev, owner, on_primary)
212
213   @staticmethod
214   def perspective_blockdev_shutdown(params):
215     """Shutdown a block device.
216
217     """
218     bdev_s = params[0]
219     bdev = objects.Disk.FromDict(bdev_s)
220     if bdev is None:
221       raise ValueError("can't unserialize data!")
222     return backend.BlockdevShutdown(bdev)
223
224   @staticmethod
225   def perspective_blockdev_addchildren(params):
226     """Add a child to a mirror device.
227
228     Note: this is only valid for mirror devices. It's the caller's duty
229     to send a correct disk, otherwise we raise an error.
230
231     """
232     bdev_s, ndev_s = params
233     bdev = objects.Disk.FromDict(bdev_s)
234     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
235     if bdev is None or ndevs.count(None) > 0:
236       raise ValueError("can't unserialize data!")
237     return backend.BlockdevAddchildren(bdev, ndevs)
238
239   @staticmethod
240   def perspective_blockdev_removechildren(params):
241     """Remove a child from a mirror device.
242
243     This is only valid for mirror devices, of course. It's the callers
244     duty to send a correct disk, otherwise we raise an error.
245
246     """
247     bdev_s, ndev_s = params
248     bdev = objects.Disk.FromDict(bdev_s)
249     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
250     if bdev is None or ndevs.count(None) > 0:
251       raise ValueError("can't unserialize data!")
252     return backend.BlockdevRemovechildren(bdev, ndevs)
253
254   @staticmethod
255   def perspective_blockdev_getmirrorstatus(params):
256     """Return the mirror status for a list of disks.
257
258     """
259     disks = [objects.Disk.FromDict(dsk_s)
260              for dsk_s in params]
261     return [status.ToDict()
262             for status in backend.BlockdevGetmirrorstatus(disks)]
263
264   @staticmethod
265   def perspective_blockdev_find(params):
266     """Expose the FindBlockDevice functionality for a disk.
267
268     This will try to find but not activate a disk.
269
270     """
271     disk = objects.Disk.FromDict(params[0])
272
273     result = backend.BlockdevFind(disk)
274     if result is None:
275       return None
276
277     return result.ToDict()
278
279   @staticmethod
280   def perspective_blockdev_snapshot(params):
281     """Create a snapshot device.
282
283     Note that this is only valid for LVM disks, if we get passed
284     something else we raise an exception. The snapshot device can be
285     remove by calling the generic block device remove call.
286
287     """
288     cfbd = objects.Disk.FromDict(params[0])
289     return backend.BlockdevSnapshot(cfbd)
290
291   @staticmethod
292   def perspective_blockdev_grow(params):
293     """Grow a stack of devices.
294
295     """
296     cfbd = objects.Disk.FromDict(params[0])
297     amount = params[1]
298     return backend.BlockdevGrow(cfbd, amount)
299
300   @staticmethod
301   def perspective_blockdev_close(params):
302     """Closes the given block devices.
303
304     """
305     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
306     return backend.BlockdevClose(params[0], disks)
307
308   @staticmethod
309   def perspective_blockdev_getsize(params):
310     """Compute the sizes of the given block devices.
311
312     """
313     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
314     return backend.BlockdevGetsize(disks)
315
316   @staticmethod
317   def perspective_blockdev_export(params):
318     """Compute the sizes of the given block devices.
319
320     """
321     disk = objects.Disk.FromDict(params[0])
322     dest_node, dest_path, cluster_name = params[1:]
323     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
324
325   # blockdev/drbd specific methods ----------
326
327   @staticmethod
328   def perspective_drbd_disconnect_net(params):
329     """Disconnects the network connection of drbd disks.
330
331     Note that this is only valid for drbd disks, so the members of the
332     disk list must all be drbd devices.
333
334     """
335     nodes_ip, disks = params
336     disks = [objects.Disk.FromDict(cf) for cf in disks]
337     return backend.DrbdDisconnectNet(nodes_ip, disks)
338
339   @staticmethod
340   def perspective_drbd_attach_net(params):
341     """Attaches the network connection of drbd disks.
342
343     Note that this is only valid for drbd disks, so the members of the
344     disk list must all be drbd devices.
345
346     """
347     nodes_ip, disks, instance_name, multimaster = params
348     disks = [objects.Disk.FromDict(cf) for cf in disks]
349     return backend.DrbdAttachNet(nodes_ip, disks,
350                                      instance_name, multimaster)
351
352   @staticmethod
353   def perspective_drbd_wait_sync(params):
354     """Wait until DRBD disks are synched.
355
356     Note that this is only valid for drbd disks, so the members of the
357     disk list must all be drbd devices.
358
359     """
360     nodes_ip, disks = params
361     disks = [objects.Disk.FromDict(cf) for cf in disks]
362     return backend.DrbdWaitSync(nodes_ip, disks)
363
364   # export/import  --------------------------
365
366   @staticmethod
367   def perspective_finalize_export(params):
368     """Expose the finalize export functionality.
369
370     """
371     instance = objects.Instance.FromDict(params[0])
372
373     snap_disks = []
374     for disk in params[1]:
375       if isinstance(disk, bool):
376         snap_disks.append(disk)
377       else:
378         snap_disks.append(objects.Disk.FromDict(disk))
379
380     return backend.FinalizeExport(instance, snap_disks)
381
382   @staticmethod
383   def perspective_export_info(params):
384     """Query information about an existing export on this node.
385
386     The given path may not contain an export, in which case we return
387     None.
388
389     """
390     path = params[0]
391     return backend.ExportInfo(path)
392
393   @staticmethod
394   def perspective_export_list(params):
395     """List the available exports on this node.
396
397     Note that as opposed to export_info, which may query data about an
398     export in any path, this only queries the standard Ganeti path
399     (constants.EXPORT_DIR).
400
401     """
402     return backend.ListExports()
403
404   @staticmethod
405   def perspective_export_remove(params):
406     """Remove an export.
407
408     """
409     export = params[0]
410     return backend.RemoveExport(export)
411
412   # volume  --------------------------
413
414   @staticmethod
415   def perspective_lv_list(params):
416     """Query the list of logical volumes in a given volume group.
417
418     """
419     vgname = params[0]
420     return backend.GetVolumeList(vgname)
421
422   @staticmethod
423   def perspective_vg_list(params):
424     """Query the list of volume groups.
425
426     """
427     return backend.ListVolumeGroups()
428
429   # Storage --------------------------
430
431   @staticmethod
432   def perspective_storage_list(params):
433     """Get list of storage units.
434
435     """
436     (su_name, su_args, name, fields) = params
437     return storage.GetStorage(su_name, *su_args).List(name, fields)
438
439   @staticmethod
440   def perspective_storage_modify(params):
441     """Modify a storage unit.
442
443     """
444     (su_name, su_args, name, changes) = params
445     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
446
447   @staticmethod
448   def perspective_storage_execute(params):
449     """Execute an operation on a storage unit.
450
451     """
452     (su_name, su_args, name, op) = params
453     return storage.GetStorage(su_name, *su_args).Execute(name, op)
454
455   # bridge  --------------------------
456
457   @staticmethod
458   def perspective_bridges_exist(params):
459     """Check if all bridges given exist on this node.
460
461     """
462     bridges_list = params[0]
463     return backend.BridgesExist(bridges_list)
464
465   # instance  --------------------------
466
467   @staticmethod
468   def perspective_instance_os_add(params):
469     """Install an OS on a given instance.
470
471     """
472     inst_s = params[0]
473     inst = objects.Instance.FromDict(inst_s)
474     reinstall = params[1]
475     debug = params[2]
476     return backend.InstanceOsAdd(inst, reinstall, debug)
477
478   @staticmethod
479   def perspective_instance_run_rename(params):
480     """Runs the OS rename script for an instance.
481
482     """
483     inst_s, old_name, debug = params
484     inst = objects.Instance.FromDict(inst_s)
485     return backend.RunRenameInstance(inst, old_name, debug)
486
487   @staticmethod
488   def perspective_instance_shutdown(params):
489     """Shutdown an instance.
490
491     """
492     instance = objects.Instance.FromDict(params[0])
493     timeout = params[1]
494     return backend.InstanceShutdown(instance, timeout)
495
496   @staticmethod
497   def perspective_instance_start(params):
498     """Start an instance.
499
500     """
501     instance = objects.Instance.FromDict(params[0])
502     return backend.StartInstance(instance)
503
504   @staticmethod
505   def perspective_migration_info(params):
506     """Gather information about an instance to be migrated.
507
508     """
509     instance = objects.Instance.FromDict(params[0])
510     return backend.MigrationInfo(instance)
511
512   @staticmethod
513   def perspective_accept_instance(params):
514     """Prepare the node to accept an instance.
515
516     """
517     instance, info, target = params
518     instance = objects.Instance.FromDict(instance)
519     return backend.AcceptInstance(instance, info, target)
520
521   @staticmethod
522   def perspective_finalize_migration(params):
523     """Finalize the instance migration.
524
525     """
526     instance, info, success = params
527     instance = objects.Instance.FromDict(instance)
528     return backend.FinalizeMigration(instance, info, success)
529
530   @staticmethod
531   def perspective_instance_migrate(params):
532     """Migrates an instance.
533
534     """
535     instance, target, live = params
536     instance = objects.Instance.FromDict(instance)
537     return backend.MigrateInstance(instance, target, live)
538
539   @staticmethod
540   def perspective_instance_reboot(params):
541     """Reboot an instance.
542
543     """
544     instance = objects.Instance.FromDict(params[0])
545     reboot_type = params[1]
546     shutdown_timeout = params[2]
547     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
548
549   @staticmethod
550   def perspective_instance_info(params):
551     """Query instance information.
552
553     """
554     return backend.GetInstanceInfo(params[0], params[1])
555
556   @staticmethod
557   def perspective_instance_migratable(params):
558     """Query whether the specified instance can be migrated.
559
560     """
561     instance = objects.Instance.FromDict(params[0])
562     return backend.GetInstanceMigratable(instance)
563
564   @staticmethod
565   def perspective_all_instances_info(params):
566     """Query information about all instances.
567
568     """
569     return backend.GetAllInstancesInfo(params[0])
570
571   @staticmethod
572   def perspective_instance_list(params):
573     """Query the list of running instances.
574
575     """
576     return backend.GetInstanceList(params[0])
577
578   # node --------------------------
579
580   @staticmethod
581   def perspective_node_tcp_ping(params):
582     """Do a TcpPing on the remote node.
583
584     """
585     return utils.TcpPing(params[1], params[2], timeout=params[3],
586                          live_port_needed=params[4], source=params[0])
587
588   @staticmethod
589   def perspective_node_has_ip_address(params):
590     """Checks if a node has the given ip address.
591
592     """
593     return utils.OwnIpAddress(params[0])
594
595   @staticmethod
596   def perspective_node_info(params):
597     """Query node information.
598
599     """
600     vgname, hypervisor_type = params
601     return backend.GetNodeInfo(vgname, hypervisor_type)
602
603   @staticmethod
604   def perspective_node_add(params):
605     """Complete the registration of this node in the cluster.
606
607     """
608     return backend.AddNode(params[0], params[1], params[2],
609                            params[3], params[4], params[5])
610
611   @staticmethod
612   def perspective_node_verify(params):
613     """Run a verify sequence on this node.
614
615     """
616     return backend.VerifyNode(params[0], params[1])
617
618   @staticmethod
619   def perspective_node_start_master(params):
620     """Promote this node to master status.
621
622     """
623     return backend.StartMaster(params[0], params[1])
624
625   @staticmethod
626   def perspective_node_stop_master(params):
627     """Demote this node from master status.
628
629     """
630     return backend.StopMaster(params[0])
631
632   @staticmethod
633   def perspective_node_leave_cluster(params):
634     """Cleanup after leaving a cluster.
635
636     """
637     return backend.LeaveCluster(params[0])
638
639   @staticmethod
640   def perspective_node_volumes(params):
641     """Query the list of all logical volume groups.
642
643     """
644     return backend.NodeVolumes()
645
646   @staticmethod
647   def perspective_node_demote_from_mc(params):
648     """Demote a node from the master candidate role.
649
650     """
651     return backend.DemoteFromMC()
652
653
654   @staticmethod
655   def perspective_node_powercycle(params):
656     """Tries to powercycle the nod.
657
658     """
659     hypervisor_type = params[0]
660     return backend.PowercycleNode(hypervisor_type)
661
662
663   # cluster --------------------------
664
665   @staticmethod
666   def perspective_version(params):
667     """Query version information.
668
669     """
670     return constants.PROTOCOL_VERSION
671
672   @staticmethod
673   def perspective_upload_file(params):
674     """Upload a file.
675
676     Note that the backend implementation imposes strict rules on which
677     files are accepted.
678
679     """
680     return backend.UploadFile(*params)
681
682   @staticmethod
683   def perspective_master_info(params):
684     """Query master information.
685
686     """
687     return backend.GetMasterInfo()
688
689   @staticmethod
690   def perspective_write_ssconf_files(params):
691     """Write ssconf files.
692
693     """
694     (values,) = params
695     return backend.WriteSsconfFiles(values)
696
697   # os -----------------------
698
699   @staticmethod
700   def perspective_os_diagnose(params):
701     """Query detailed information about existing OSes.
702
703     """
704     return backend.DiagnoseOS()
705
706   @staticmethod
707   def perspective_os_get(params):
708     """Query information about a given OS.
709
710     """
711     name = params[0]
712     os_obj = backend.OSFromDisk(name)
713     return os_obj.ToDict()
714
715   # hooks -----------------------
716
717   @staticmethod
718   def perspective_hooks_runner(params):
719     """Run hook scripts.
720
721     """
722     hpath, phase, env = params
723     hr = backend.HooksRunner()
724     return hr.RunHooks(hpath, phase, env)
725
726   # iallocator -----------------
727
728   @staticmethod
729   def perspective_iallocator_runner(params):
730     """Run an iallocator script.
731
732     """
733     name, idata = params
734     iar = backend.IAllocatorRunner()
735     return iar.Run(name, idata)
736
737   # test -----------------------
738
739   @staticmethod
740   def perspective_test_delay(params):
741     """Run test delay.
742
743     """
744     duration = params[0]
745     status, rval = utils.TestDelay(duration)
746     if not status:
747       raise backend.RPCFail(rval)
748     return rval
749
750   # file storage ---------------
751
752   @staticmethod
753   def perspective_file_storage_dir_create(params):
754     """Create the file storage directory.
755
756     """
757     file_storage_dir = params[0]
758     return backend.CreateFileStorageDir(file_storage_dir)
759
760   @staticmethod
761   def perspective_file_storage_dir_remove(params):
762     """Remove the file storage directory.
763
764     """
765     file_storage_dir = params[0]
766     return backend.RemoveFileStorageDir(file_storage_dir)
767
768   @staticmethod
769   def perspective_file_storage_dir_rename(params):
770     """Rename the file storage directory.
771
772     """
773     old_file_storage_dir = params[0]
774     new_file_storage_dir = params[1]
775     return backend.RenameFileStorageDir(old_file_storage_dir,
776                                         new_file_storage_dir)
777
778   # jobs ------------------------
779
780   @staticmethod
781   @_RequireJobQueueLock
782   def perspective_jobqueue_update(params):
783     """Update job queue.
784
785     """
786     (file_name, content) = params
787     return backend.JobQueueUpdate(file_name, content)
788
789   @staticmethod
790   @_RequireJobQueueLock
791   def perspective_jobqueue_purge(params):
792     """Purge job queue.
793
794     """
795     return backend.JobQueuePurge()
796
797   @staticmethod
798   @_RequireJobQueueLock
799   def perspective_jobqueue_rename(params):
800     """Rename a job queue file.
801
802     """
803     # TODO: What if a file fails to rename?
804     return [backend.JobQueueRename(old, new) for old, new in params]
805
806   @staticmethod
807   def perspective_jobqueue_set_drain(params):
808     """Set/unset the queue drain flag.
809
810     """
811     drain_flag = params[0]
812     return backend.JobQueueSetDrainFlag(drain_flag)
813
814
815   # hypervisor ---------------
816
817   @staticmethod
818   def perspective_hypervisor_validate_params(params):
819     """Validate the hypervisor parameters.
820
821     """
822     (hvname, hvparams) = params
823     return backend.ValidateHVParams(hvname, hvparams)
824
825   # Crypto
826
827   @staticmethod
828   def perspective_x509_cert_create(params):
829     """Creates a new X509 certificate for SSL/TLS.
830
831     """
832     (validity, ) = params
833     return backend.CreateX509Certificate(validity)
834
835   @staticmethod
836   def perspective_x509_cert_remove(params):
837     """Removes a X509 certificate.
838
839     """
840     (name, ) = params
841     return backend.RemoveX509Certificate(name)
842
843   # Import and export
844
845   @staticmethod
846   def perspective_import_start(params):
847     """Starts an import daemon.
848
849     """
850     (opts_s, instance, dest, dest_args) = params
851
852     opts = objects.ImportExportOptions.FromDict(opts_s)
853
854     return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
855                                            None, None,
856                                            objects.Instance.FromDict(instance),
857                                            dest,
858                                            _DecodeImportExportIO(dest,
859                                                                  dest_args))
860
861   @staticmethod
862   def perspective_export_start(params):
863     """Starts an export daemon.
864
865     """
866     (opts_s, host, port, instance, source, source_args) = params
867
868     opts = objects.ImportExportOptions.FromDict(opts_s)
869
870     return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
871                                            host, port,
872                                            objects.Instance.FromDict(instance),
873                                            source,
874                                            _DecodeImportExportIO(source,
875                                                                  source_args))
876
877   @staticmethod
878   def perspective_impexp_status(params):
879     """Retrieves the status of an import or export daemon.
880
881     """
882     return backend.GetImportExportStatus(params[0])
883
884   @staticmethod
885   def perspective_impexp_abort(params):
886     """Aborts an import or export.
887
888     """
889     return backend.AbortImportExport(params[0])
890
891   @staticmethod
892   def perspective_impexp_cleanup(params):
893     """Cleans up after an import or export.
894
895     """
896     return backend.CleanupImportExport(params[0])
897
898
899 def CheckNoded(_, args):
900   """Initial checks whether to run or exit with a failure.
901
902   """
903   if args: # noded doesn't take any arguments
904     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
905                           sys.argv[0])
906     sys.exit(constants.EXIT_FAILURE)
907
908
909 def ExecNoded(options, _):
910   """Main node daemon function, executed with the PID file held.
911
912   """
913   if options.mlock:
914     utils.Mlockall()
915     request_executor_class = MlockallRequestExecutor
916   else:
917     request_executor_class = http.server.HttpServerRequestExecutor
918
919   # Read SSL certificate
920   if options.ssl:
921     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
922                                     ssl_cert_path=options.ssl_cert)
923   else:
924     ssl_params = None
925
926   err = _PrepareQueueLock()
927   if err is not None:
928     # this might be some kind of file-system/permission error; while
929     # this breaks the job queue functionality, we shouldn't prevent
930     # startup of the whole node daemon because of this
931     logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
932
933   mainloop = daemon.Mainloop()
934   server = NodeHttpServer(mainloop, options.bind_address, options.port,
935                           ssl_params=ssl_params, ssl_verify_peer=True,
936                           request_executor_class=request_executor_class)
937   server.Start()
938   try:
939     mainloop.Run()
940   finally:
941     server.Stop()
942
943
944 def main():
945   """Main function for the node daemon.
946
947   """
948   parser = OptionParser(description="Ganeti node daemon",
949                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
950                         version="%%prog (ganeti) %s" %
951                         constants.RELEASE_VERSION)
952   parser.add_option("--no-mlock", dest="mlock",
953                     help="Do not mlock the node memory in ram",
954                     default=True, action="store_false")
955
956   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
957   dirs.append((constants.LOG_OS_DIR, 0750))
958   dirs.append((constants.LOCK_DIR, 1777))
959   dirs.append((constants.CRYPTO_KEYS_DIR, constants.CRYPTO_KEYS_DIR_MODE))
960   dirs.append((constants.IMPORT_EXPORT_DIR, constants.IMPORT_EXPORT_DIR_MODE))
961   daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
962                      default_ssl_cert=constants.NODED_CERT_FILE,
963                      default_ssl_key=constants.NODED_CERT_FILE,
964                      console_logging=True)
965
966
967 if __name__ == '__main__':
968   main()