Rename import/export RPC calls to match others
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable-msg=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36
37 from optparse import OptionParser
38
39 from ganeti import backend
40 from ganeti import constants
41 from ganeti import objects
42 from ganeti import errors
43 from ganeti import jstore
44 from ganeti import daemon
45 from ganeti import http
46 from ganeti import utils
47 from ganeti import storage
48 from ganeti import serializer
49
50 import ganeti.http.server # pylint: disable-msg=W0611
51
52
53 queue_lock = None
54
55
56 def _PrepareQueueLock():
57   """Try to prepare the queue lock.
58
59   @return: None for success, otherwise an exception object
60
61   """
62   global queue_lock # pylint: disable-msg=W0603
63
64   if queue_lock is not None:
65     return None
66
67   # Prepare job queue
68   try:
69     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
70     return None
71   except EnvironmentError, err:
72     return err
73
74
75 def _RequireJobQueueLock(fn):
76   """Decorator for job queue manipulating functions.
77
78   """
79   QUEUE_LOCK_TIMEOUT = 10
80
81   def wrapper(*args, **kwargs):
82     # Locking in exclusive, blocking mode because there could be several
83     # children running at the same time. Waiting up to 10 seconds.
84     if _PrepareQueueLock() is not None:
85       raise errors.JobQueueError("Job queue failed initialization,"
86                                  " cannot update jobs")
87     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
88     try:
89       return fn(*args, **kwargs)
90     finally:
91       queue_lock.Unlock()
92
93   return wrapper
94
95
96 def _DecodeImportExportIO(ieio, ieioargs):
97   """Decodes import/export I/O information.
98
99   """
100   if ieio == constants.IEIO_RAW_DISK:
101     assert len(ieioargs) == 1
102     return (objects.Disk.FromDict(ieioargs[0]), )
103
104   if ieio == constants.IEIO_SCRIPT:
105     assert len(ieioargs) == 2
106     return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
107
108   return ieioargs
109
110
111 class NodeHttpServer(http.server.HttpServer):
112   """The server implementation.
113
114   This class holds all methods exposed over the RPC interface.
115
116   """
117   # too many public methods, and unused args - all methods get params
118   # due to the API
119   # pylint: disable-msg=R0904,W0613
120   def __init__(self, *args, **kwargs):
121     http.server.HttpServer.__init__(self, *args, **kwargs)
122     self.noded_pid = os.getpid()
123
124   def HandleRequest(self, req):
125     """Handle a request.
126
127     """
128     if req.request_method.upper() != http.HTTP_PUT:
129       raise http.HttpBadRequest()
130
131     path = req.request_path
132     if path.startswith("/"):
133       path = path[1:]
134
135     method = getattr(self, "perspective_%s" % path, None)
136     if method is None:
137       raise http.HttpNotFound()
138
139     try:
140       result = (True, method(serializer.LoadJson(req.request_body)))
141
142     except backend.RPCFail, err:
143       # our custom failure exception; str(err) works fine if the
144       # exception was constructed with a single argument, and in
145       # this case, err.message == err.args[0] == str(err)
146       result = (False, str(err))
147     except errors.QuitGanetiException, err:
148       # Tell parent to quit
149       logging.info("Shutting down the node daemon, arguments: %s",
150                    str(err.args))
151       os.kill(self.noded_pid, signal.SIGTERM)
152       # And return the error's arguments, which must be already in
153       # correct tuple format
154       result = err.args
155     except Exception, err:
156       logging.exception("Error in RPC call")
157       result = (False, "Error while executing backend function: %s" % str(err))
158
159     return serializer.DumpJson(result, indent=False)
160
161   # the new block devices  --------------------------
162
163   @staticmethod
164   def perspective_blockdev_create(params):
165     """Create a block device.
166
167     """
168     bdev_s, size, owner, on_primary, info = params
169     bdev = objects.Disk.FromDict(bdev_s)
170     if bdev is None:
171       raise ValueError("can't unserialize data!")
172     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
173
174   @staticmethod
175   def perspective_blockdev_remove(params):
176     """Remove a block device.
177
178     """
179     bdev_s = params[0]
180     bdev = objects.Disk.FromDict(bdev_s)
181     return backend.BlockdevRemove(bdev)
182
183   @staticmethod
184   def perspective_blockdev_rename(params):
185     """Remove a block device.
186
187     """
188     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
189     return backend.BlockdevRename(devlist)
190
191   @staticmethod
192   def perspective_blockdev_assemble(params):
193     """Assemble a block device.
194
195     """
196     bdev_s, owner, on_primary = params
197     bdev = objects.Disk.FromDict(bdev_s)
198     if bdev is None:
199       raise ValueError("can't unserialize data!")
200     return backend.BlockdevAssemble(bdev, owner, on_primary)
201
202   @staticmethod
203   def perspective_blockdev_shutdown(params):
204     """Shutdown a block device.
205
206     """
207     bdev_s = params[0]
208     bdev = objects.Disk.FromDict(bdev_s)
209     if bdev is None:
210       raise ValueError("can't unserialize data!")
211     return backend.BlockdevShutdown(bdev)
212
213   @staticmethod
214   def perspective_blockdev_addchildren(params):
215     """Add a child to a mirror device.
216
217     Note: this is only valid for mirror devices. It's the caller's duty
218     to send a correct disk, otherwise we raise an error.
219
220     """
221     bdev_s, ndev_s = params
222     bdev = objects.Disk.FromDict(bdev_s)
223     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
224     if bdev is None or ndevs.count(None) > 0:
225       raise ValueError("can't unserialize data!")
226     return backend.BlockdevAddchildren(bdev, ndevs)
227
228   @staticmethod
229   def perspective_blockdev_removechildren(params):
230     """Remove a child from a mirror device.
231
232     This is only valid for mirror devices, of course. It's the callers
233     duty to send a correct disk, otherwise we raise an error.
234
235     """
236     bdev_s, ndev_s = params
237     bdev = objects.Disk.FromDict(bdev_s)
238     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
239     if bdev is None or ndevs.count(None) > 0:
240       raise ValueError("can't unserialize data!")
241     return backend.BlockdevRemovechildren(bdev, ndevs)
242
243   @staticmethod
244   def perspective_blockdev_getmirrorstatus(params):
245     """Return the mirror status for a list of disks.
246
247     """
248     disks = [objects.Disk.FromDict(dsk_s)
249              for dsk_s in params]
250     return [status.ToDict()
251             for status in backend.BlockdevGetmirrorstatus(disks)]
252
253   @staticmethod
254   def perspective_blockdev_find(params):
255     """Expose the FindBlockDevice functionality for a disk.
256
257     This will try to find but not activate a disk.
258
259     """
260     disk = objects.Disk.FromDict(params[0])
261
262     result = backend.BlockdevFind(disk)
263     if result is None:
264       return None
265
266     return result.ToDict()
267
268   @staticmethod
269   def perspective_blockdev_snapshot(params):
270     """Create a snapshot device.
271
272     Note that this is only valid for LVM disks, if we get passed
273     something else we raise an exception. The snapshot device can be
274     remove by calling the generic block device remove call.
275
276     """
277     cfbd = objects.Disk.FromDict(params[0])
278     return backend.BlockdevSnapshot(cfbd)
279
280   @staticmethod
281   def perspective_blockdev_grow(params):
282     """Grow a stack of devices.
283
284     """
285     cfbd = objects.Disk.FromDict(params[0])
286     amount = params[1]
287     return backend.BlockdevGrow(cfbd, amount)
288
289   @staticmethod
290   def perspective_blockdev_close(params):
291     """Closes the given block devices.
292
293     """
294     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
295     return backend.BlockdevClose(params[0], disks)
296
297   @staticmethod
298   def perspective_blockdev_getsize(params):
299     """Compute the sizes of the given block devices.
300
301     """
302     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
303     return backend.BlockdevGetsize(disks)
304
305   @staticmethod
306   def perspective_blockdev_export(params):
307     """Compute the sizes of the given block devices.
308
309     """
310     disk = objects.Disk.FromDict(params[0])
311     dest_node, dest_path, cluster_name = params[1:]
312     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
313
314   # blockdev/drbd specific methods ----------
315
316   @staticmethod
317   def perspective_drbd_disconnect_net(params):
318     """Disconnects the network connection of drbd disks.
319
320     Note that this is only valid for drbd disks, so the members of the
321     disk list must all be drbd devices.
322
323     """
324     nodes_ip, disks = params
325     disks = [objects.Disk.FromDict(cf) for cf in disks]
326     return backend.DrbdDisconnectNet(nodes_ip, disks)
327
328   @staticmethod
329   def perspective_drbd_attach_net(params):
330     """Attaches the network connection of drbd disks.
331
332     Note that this is only valid for drbd disks, so the members of the
333     disk list must all be drbd devices.
334
335     """
336     nodes_ip, disks, instance_name, multimaster = params
337     disks = [objects.Disk.FromDict(cf) for cf in disks]
338     return backend.DrbdAttachNet(nodes_ip, disks,
339                                      instance_name, multimaster)
340
341   @staticmethod
342   def perspective_drbd_wait_sync(params):
343     """Wait until DRBD disks are synched.
344
345     Note that this is only valid for drbd disks, so the members of the
346     disk list must all be drbd devices.
347
348     """
349     nodes_ip, disks = params
350     disks = [objects.Disk.FromDict(cf) for cf in disks]
351     return backend.DrbdWaitSync(nodes_ip, disks)
352
353   # export/import  --------------------------
354
355   @staticmethod
356   def perspective_snapshot_export(params):
357     """Export a given snapshot.
358
359     """
360     disk = objects.Disk.FromDict(params[0])
361     dest_node = params[1]
362     instance = objects.Instance.FromDict(params[2])
363     cluster_name = params[3]
364     dev_idx = params[4]
365     debug = params[5]
366     return backend.ExportSnapshot(disk, dest_node, instance,
367                                   cluster_name, dev_idx, debug)
368
369   @staticmethod
370   def perspective_finalize_export(params):
371     """Expose the finalize export functionality.
372
373     """
374     instance = objects.Instance.FromDict(params[0])
375
376     snap_disks = []
377     for disk in params[1]:
378       if isinstance(disk, bool):
379         snap_disks.append(disk)
380       else:
381         snap_disks.append(objects.Disk.FromDict(disk))
382
383     return backend.FinalizeExport(instance, snap_disks)
384
385   @staticmethod
386   def perspective_export_info(params):
387     """Query information about an existing export on this node.
388
389     The given path may not contain an export, in which case we return
390     None.
391
392     """
393     path = params[0]
394     return backend.ExportInfo(path)
395
396   @staticmethod
397   def perspective_export_list(params):
398     """List the available exports on this node.
399
400     Note that as opposed to export_info, which may query data about an
401     export in any path, this only queries the standard Ganeti path
402     (constants.EXPORT_DIR).
403
404     """
405     return backend.ListExports()
406
407   @staticmethod
408   def perspective_export_remove(params):
409     """Remove an export.
410
411     """
412     export = params[0]
413     return backend.RemoveExport(export)
414
415   # volume  --------------------------
416
417   @staticmethod
418   def perspective_lv_list(params):
419     """Query the list of logical volumes in a given volume group.
420
421     """
422     vgname = params[0]
423     return backend.GetVolumeList(vgname)
424
425   @staticmethod
426   def perspective_vg_list(params):
427     """Query the list of volume groups.
428
429     """
430     return backend.ListVolumeGroups()
431
432   # Storage --------------------------
433
434   @staticmethod
435   def perspective_storage_list(params):
436     """Get list of storage units.
437
438     """
439     (su_name, su_args, name, fields) = params
440     return storage.GetStorage(su_name, *su_args).List(name, fields)
441
442   @staticmethod
443   def perspective_storage_modify(params):
444     """Modify a storage unit.
445
446     """
447     (su_name, su_args, name, changes) = params
448     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
449
450   @staticmethod
451   def perspective_storage_execute(params):
452     """Execute an operation on a storage unit.
453
454     """
455     (su_name, su_args, name, op) = params
456     return storage.GetStorage(su_name, *su_args).Execute(name, op)
457
458   # bridge  --------------------------
459
460   @staticmethod
461   def perspective_bridges_exist(params):
462     """Check if all bridges given exist on this node.
463
464     """
465     bridges_list = params[0]
466     return backend.BridgesExist(bridges_list)
467
468   # instance  --------------------------
469
470   @staticmethod
471   def perspective_instance_os_add(params):
472     """Install an OS on a given instance.
473
474     """
475     inst_s = params[0]
476     inst = objects.Instance.FromDict(inst_s)
477     reinstall = params[1]
478     debug = params[2]
479     return backend.InstanceOsAdd(inst, reinstall, debug)
480
481   @staticmethod
482   def perspective_instance_run_rename(params):
483     """Runs the OS rename script for an instance.
484
485     """
486     inst_s, old_name, debug = params
487     inst = objects.Instance.FromDict(inst_s)
488     return backend.RunRenameInstance(inst, old_name, debug)
489
490   @staticmethod
491   def perspective_instance_os_import(params):
492     """Run the import function of an OS onto a given instance.
493
494     """
495     inst_s, src_node, src_images, cluster_name, debug = params
496     inst = objects.Instance.FromDict(inst_s)
497     return backend.ImportOSIntoInstance(inst, src_node, src_images,
498                                         cluster_name, debug)
499
500   @staticmethod
501   def perspective_instance_shutdown(params):
502     """Shutdown an instance.
503
504     """
505     instance = objects.Instance.FromDict(params[0])
506     timeout = params[1]
507     return backend.InstanceShutdown(instance, timeout)
508
509   @staticmethod
510   def perspective_instance_start(params):
511     """Start an instance.
512
513     """
514     instance = objects.Instance.FromDict(params[0])
515     return backend.StartInstance(instance)
516
517   @staticmethod
518   def perspective_migration_info(params):
519     """Gather information about an instance to be migrated.
520
521     """
522     instance = objects.Instance.FromDict(params[0])
523     return backend.MigrationInfo(instance)
524
525   @staticmethod
526   def perspective_accept_instance(params):
527     """Prepare the node to accept an instance.
528
529     """
530     instance, info, target = params
531     instance = objects.Instance.FromDict(instance)
532     return backend.AcceptInstance(instance, info, target)
533
534   @staticmethod
535   def perspective_finalize_migration(params):
536     """Finalize the instance migration.
537
538     """
539     instance, info, success = params
540     instance = objects.Instance.FromDict(instance)
541     return backend.FinalizeMigration(instance, info, success)
542
543   @staticmethod
544   def perspective_instance_migrate(params):
545     """Migrates an instance.
546
547     """
548     instance, target, live = params
549     instance = objects.Instance.FromDict(instance)
550     return backend.MigrateInstance(instance, target, live)
551
552   @staticmethod
553   def perspective_instance_reboot(params):
554     """Reboot an instance.
555
556     """
557     instance = objects.Instance.FromDict(params[0])
558     reboot_type = params[1]
559     shutdown_timeout = params[2]
560     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
561
562   @staticmethod
563   def perspective_instance_info(params):
564     """Query instance information.
565
566     """
567     return backend.GetInstanceInfo(params[0], params[1])
568
569   @staticmethod
570   def perspective_instance_migratable(params):
571     """Query whether the specified instance can be migrated.
572
573     """
574     instance = objects.Instance.FromDict(params[0])
575     return backend.GetInstanceMigratable(instance)
576
577   @staticmethod
578   def perspective_all_instances_info(params):
579     """Query information about all instances.
580
581     """
582     return backend.GetAllInstancesInfo(params[0])
583
584   @staticmethod
585   def perspective_instance_list(params):
586     """Query the list of running instances.
587
588     """
589     return backend.GetInstanceList(params[0])
590
591   # node --------------------------
592
593   @staticmethod
594   def perspective_node_tcp_ping(params):
595     """Do a TcpPing on the remote node.
596
597     """
598     return utils.TcpPing(params[1], params[2], timeout=params[3],
599                          live_port_needed=params[4], source=params[0])
600
601   @staticmethod
602   def perspective_node_has_ip_address(params):
603     """Checks if a node has the given ip address.
604
605     """
606     return utils.OwnIpAddress(params[0])
607
608   @staticmethod
609   def perspective_node_info(params):
610     """Query node information.
611
612     """
613     vgname, hypervisor_type = params
614     return backend.GetNodeInfo(vgname, hypervisor_type)
615
616   @staticmethod
617   def perspective_node_add(params):
618     """Complete the registration of this node in the cluster.
619
620     """
621     return backend.AddNode(params[0], params[1], params[2],
622                            params[3], params[4], params[5])
623
624   @staticmethod
625   def perspective_node_verify(params):
626     """Run a verify sequence on this node.
627
628     """
629     return backend.VerifyNode(params[0], params[1])
630
631   @staticmethod
632   def perspective_node_start_master(params):
633     """Promote this node to master status.
634
635     """
636     return backend.StartMaster(params[0], params[1])
637
638   @staticmethod
639   def perspective_node_stop_master(params):
640     """Demote this node from master status.
641
642     """
643     return backend.StopMaster(params[0])
644
645   @staticmethod
646   def perspective_node_leave_cluster(params):
647     """Cleanup after leaving a cluster.
648
649     """
650     return backend.LeaveCluster(params[0])
651
652   @staticmethod
653   def perspective_node_volumes(params):
654     """Query the list of all logical volume groups.
655
656     """
657     return backend.NodeVolumes()
658
659   @staticmethod
660   def perspective_node_demote_from_mc(params):
661     """Demote a node from the master candidate role.
662
663     """
664     return backend.DemoteFromMC()
665
666
667   @staticmethod
668   def perspective_node_powercycle(params):
669     """Tries to powercycle the nod.
670
671     """
672     hypervisor_type = params[0]
673     return backend.PowercycleNode(hypervisor_type)
674
675
676   # cluster --------------------------
677
678   @staticmethod
679   def perspective_version(params):
680     """Query version information.
681
682     """
683     return constants.PROTOCOL_VERSION
684
685   @staticmethod
686   def perspective_upload_file(params):
687     """Upload a file.
688
689     Note that the backend implementation imposes strict rules on which
690     files are accepted.
691
692     """
693     return backend.UploadFile(*params)
694
695   @staticmethod
696   def perspective_master_info(params):
697     """Query master information.
698
699     """
700     return backend.GetMasterInfo()
701
702   @staticmethod
703   def perspective_write_ssconf_files(params):
704     """Write ssconf files.
705
706     """
707     (values,) = params
708     return backend.WriteSsconfFiles(values)
709
710   # os -----------------------
711
712   @staticmethod
713   def perspective_os_diagnose(params):
714     """Query detailed information about existing OSes.
715
716     """
717     return backend.DiagnoseOS()
718
719   @staticmethod
720   def perspective_os_get(params):
721     """Query information about a given OS.
722
723     """
724     name = params[0]
725     os_obj = backend.OSFromDisk(name)
726     return os_obj.ToDict()
727
728   # hooks -----------------------
729
730   @staticmethod
731   def perspective_hooks_runner(params):
732     """Run hook scripts.
733
734     """
735     hpath, phase, env = params
736     hr = backend.HooksRunner()
737     return hr.RunHooks(hpath, phase, env)
738
739   # iallocator -----------------
740
741   @staticmethod
742   def perspective_iallocator_runner(params):
743     """Run an iallocator script.
744
745     """
746     name, idata = params
747     iar = backend.IAllocatorRunner()
748     return iar.Run(name, idata)
749
750   # test -----------------------
751
752   @staticmethod
753   def perspective_test_delay(params):
754     """Run test delay.
755
756     """
757     duration = params[0]
758     status, rval = utils.TestDelay(duration)
759     if not status:
760       raise backend.RPCFail(rval)
761     return rval
762
763   # file storage ---------------
764
765   @staticmethod
766   def perspective_file_storage_dir_create(params):
767     """Create the file storage directory.
768
769     """
770     file_storage_dir = params[0]
771     return backend.CreateFileStorageDir(file_storage_dir)
772
773   @staticmethod
774   def perspective_file_storage_dir_remove(params):
775     """Remove the file storage directory.
776
777     """
778     file_storage_dir = params[0]
779     return backend.RemoveFileStorageDir(file_storage_dir)
780
781   @staticmethod
782   def perspective_file_storage_dir_rename(params):
783     """Rename the file storage directory.
784
785     """
786     old_file_storage_dir = params[0]
787     new_file_storage_dir = params[1]
788     return backend.RenameFileStorageDir(old_file_storage_dir,
789                                         new_file_storage_dir)
790
791   # jobs ------------------------
792
793   @staticmethod
794   @_RequireJobQueueLock
795   def perspective_jobqueue_update(params):
796     """Update job queue.
797
798     """
799     (file_name, content) = params
800     return backend.JobQueueUpdate(file_name, content)
801
802   @staticmethod
803   @_RequireJobQueueLock
804   def perspective_jobqueue_purge(params):
805     """Purge job queue.
806
807     """
808     return backend.JobQueuePurge()
809
810   @staticmethod
811   @_RequireJobQueueLock
812   def perspective_jobqueue_rename(params):
813     """Rename a job queue file.
814
815     """
816     # TODO: What if a file fails to rename?
817     return [backend.JobQueueRename(old, new) for old, new in params]
818
819   @staticmethod
820   def perspective_jobqueue_set_drain(params):
821     """Set/unset the queue drain flag.
822
823     """
824     drain_flag = params[0]
825     return backend.JobQueueSetDrainFlag(drain_flag)
826
827
828   # hypervisor ---------------
829
830   @staticmethod
831   def perspective_hypervisor_validate_params(params):
832     """Validate the hypervisor parameters.
833
834     """
835     (hvname, hvparams) = params
836     return backend.ValidateHVParams(hvname, hvparams)
837
838   # Crypto
839
840   @staticmethod
841   def perspective_x509_cert_create(params):
842     """Creates a new X509 certificate for SSL/TLS.
843
844     """
845     (validity, ) = params
846     return backend.CreateX509Certificate(validity)
847
848   @staticmethod
849   def perspective_x509_cert_remove(params):
850     """Removes a X509 certificate.
851
852     """
853     (name, ) = params
854     return backend.RemoveX509Certificate(name)
855
856   # Import and export
857
858   @staticmethod
859   def perspective_import_start(params):
860     """Starts an import daemon.
861
862     """
863     (x509_key_name, source_x509_ca, instance, dest, dest_args) = params
864     return backend.StartImportExportDaemon(constants.IEM_IMPORT,
865                                            x509_key_name, source_x509_ca,
866                                            None, None,
867                                            objects.Instance.FromDict(instance),
868                                            dest,
869                                            _DecodeImportExportIO(dest,
870                                                                  dest_args))
871   @staticmethod
872   def perspective_export_start(params):
873     """Starts an export daemon.
874
875     """
876     (x509_key_name, dest_x509_ca, host, port, instance,
877      source, source_args) = params
878     return backend.StartImportExportDaemon(constants.IEM_EXPORT,
879                                            x509_key_name, dest_x509_ca,
880                                            host, port,
881                                            objects.Instance.FromDict(instance),
882                                            source,
883                                            _DecodeImportExportIO(source,
884                                                                  source_args))
885
886   @staticmethod
887   def perspective_impexp_status(params):
888     """Retrieves the status of an import or export daemon.
889
890     """
891     return backend.GetImportExportStatus(params[0])
892
893   @staticmethod
894   def perspective_impexp_cleanup(params):
895     """Cleans up after an import or export.
896
897     """
898     return backend.CleanupImportExport(params[0])
899
900
901 def CheckNoded(_, args):
902   """Initial checks whether to run or exit with a failure.
903
904   """
905   if args: # noded doesn't take any arguments
906     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
907                           sys.argv[0])
908     sys.exit(constants.EXIT_FAILURE)
909
910
911 def ExecNoded(options, _):
912   """Main node daemon function, executed with the PID file held.
913
914   """
915   # Read SSL certificate
916   if options.ssl:
917     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
918                                     ssl_cert_path=options.ssl_cert)
919   else:
920     ssl_params = None
921
922   err = _PrepareQueueLock()
923   if err is not None:
924     # this might be some kind of file-system/permission error; while
925     # this breaks the job queue functionality, we shouldn't prevent
926     # startup of the whole node daemon because of this
927     logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
928
929   mainloop = daemon.Mainloop()
930   server = NodeHttpServer(mainloop, options.bind_address, options.port,
931                           ssl_params=ssl_params, ssl_verify_peer=True)
932   server.Start()
933   try:
934     mainloop.Run()
935   finally:
936     server.Stop()
937
938
939 def main():
940   """Main function for the node daemon.
941
942   """
943   parser = OptionParser(description="Ganeti node daemon",
944                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
945                         version="%%prog (ganeti) %s" %
946                         constants.RELEASE_VERSION)
947   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
948   dirs.append((constants.LOG_OS_DIR, 0750))
949   dirs.append((constants.LOCK_DIR, 1777))
950   dirs.append((constants.CRYPTO_KEYS_DIR, constants.CRYPTO_KEYS_DIR_MODE))
951   dirs.append((constants.IMPORT_EXPORT_DIR, constants.IMPORT_EXPORT_DIR_MODE))
952   daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
953                      default_ssl_cert=constants.NODED_CERT_FILE,
954                      default_ssl_key=constants.NODED_CERT_FILE)
955
956
957 if __name__ == '__main__':
958   main()