Accept both PUT and POST in noded
[ganeti-local] / lib / server / noded.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36 import codecs
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti import storage
49 from ganeti import serializer
50 from ganeti import netutils
51
52 import ganeti.http.server # pylint: disable=W0611
53
54
55 queue_lock = None
56
57
58 def _PrepareQueueLock():
59   """Try to prepare the queue lock.
60
61   @return: None for success, otherwise an exception object
62
63   """
64   global queue_lock # pylint: disable=W0603
65
66   if queue_lock is not None:
67     return None
68
69   # Prepare job queue
70   try:
71     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
72     return None
73   except EnvironmentError, err:
74     return err
75
76
77 def _RequireJobQueueLock(fn):
78   """Decorator for job queue manipulating functions.
79
80   """
81   QUEUE_LOCK_TIMEOUT = 10
82
83   def wrapper(*args, **kwargs):
84     # Locking in exclusive, blocking mode because there could be several
85     # children running at the same time. Waiting up to 10 seconds.
86     if _PrepareQueueLock() is not None:
87       raise errors.JobQueueError("Job queue failed initialization,"
88                                  " cannot update jobs")
89     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
90     try:
91       return fn(*args, **kwargs)
92     finally:
93       queue_lock.Unlock()
94
95   return wrapper
96
97
98 def _DecodeImportExportIO(ieio, ieioargs):
99   """Decodes import/export I/O information.
100
101   """
102   if ieio == constants.IEIO_RAW_DISK:
103     assert len(ieioargs) == 1
104     return (objects.Disk.FromDict(ieioargs[0]), )
105
106   if ieio == constants.IEIO_SCRIPT:
107     assert len(ieioargs) == 2
108     return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
109
110   return ieioargs
111
112
113 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
114   """Custom Request Executor class that ensures NodeHttpServer children are
115   locked in ram.
116
117   """
118   def __init__(self, *args, **kwargs):
119     utils.Mlockall()
120
121     http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
122
123
124 class NodeHttpServer(http.server.HttpServer):
125   """The server implementation.
126
127   This class holds all methods exposed over the RPC interface.
128
129   """
130   # too many public methods, and unused args - all methods get params
131   # due to the API
132   # pylint: disable=R0904,W0613
133   def __init__(self, *args, **kwargs):
134     http.server.HttpServer.__init__(self, *args, **kwargs)
135     self.noded_pid = os.getpid()
136
137   def HandleRequest(self, req):
138     """Handle a request.
139
140     """
141     # FIXME: Remove HTTP_PUT in Ganeti 2.7
142     if req.request_method.upper() not in (http.HTTP_PUT, http.HTTP_POST):
143       raise http.HttpBadRequest("Only PUT and POST methods are supported")
144
145     path = req.request_path
146     if path.startswith("/"):
147       path = path[1:]
148
149     method = getattr(self, "perspective_%s" % path, None)
150     if method is None:
151       raise http.HttpNotFound()
152
153     try:
154       result = (True, method(serializer.LoadJson(req.request_body)))
155
156     except backend.RPCFail, err:
157       # our custom failure exception; str(err) works fine if the
158       # exception was constructed with a single argument, and in
159       # this case, err.message == err.args[0] == str(err)
160       result = (False, str(err))
161     except errors.QuitGanetiException, err:
162       # Tell parent to quit
163       logging.info("Shutting down the node daemon, arguments: %s",
164                    str(err.args))
165       os.kill(self.noded_pid, signal.SIGTERM)
166       # And return the error's arguments, which must be already in
167       # correct tuple format
168       result = err.args
169     except Exception, err:
170       logging.exception("Error in RPC call")
171       result = (False, "Error while executing backend function: %s" % str(err))
172
173     return serializer.DumpJson(result)
174
175   # the new block devices  --------------------------
176
177   @staticmethod
178   def perspective_blockdev_create(params):
179     """Create a block device.
180
181     """
182     bdev_s, size, owner, on_primary, info = params
183     bdev = objects.Disk.FromDict(bdev_s)
184     if bdev is None:
185       raise ValueError("can't unserialize data!")
186     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
187
188   @staticmethod
189   def perspective_blockdev_pause_resume_sync(params):
190     """Pause/resume sync of a block device.
191
192     """
193     disks_s, pause = params
194     disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
195     return backend.BlockdevPauseResumeSync(disks, pause)
196
197   @staticmethod
198   def perspective_blockdev_wipe(params):
199     """Wipe a block device.
200
201     """
202     bdev_s, offset, size = params
203     bdev = objects.Disk.FromDict(bdev_s)
204     return backend.BlockdevWipe(bdev, offset, size)
205
206   @staticmethod
207   def perspective_blockdev_remove(params):
208     """Remove a block device.
209
210     """
211     bdev_s = params[0]
212     bdev = objects.Disk.FromDict(bdev_s)
213     return backend.BlockdevRemove(bdev)
214
215   @staticmethod
216   def perspective_blockdev_rename(params):
217     """Remove a block device.
218
219     """
220     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]]
221     return backend.BlockdevRename(devlist)
222
223   @staticmethod
224   def perspective_blockdev_assemble(params):
225     """Assemble a block device.
226
227     """
228     bdev_s, owner, on_primary, idx = params
229     bdev = objects.Disk.FromDict(bdev_s)
230     if bdev is None:
231       raise ValueError("can't unserialize data!")
232     return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
233
234   @staticmethod
235   def perspective_blockdev_shutdown(params):
236     """Shutdown a block device.
237
238     """
239     bdev_s = params[0]
240     bdev = objects.Disk.FromDict(bdev_s)
241     if bdev is None:
242       raise ValueError("can't unserialize data!")
243     return backend.BlockdevShutdown(bdev)
244
245   @staticmethod
246   def perspective_blockdev_addchildren(params):
247     """Add a child to a mirror device.
248
249     Note: this is only valid for mirror devices. It's the caller's duty
250     to send a correct disk, otherwise we raise an error.
251
252     """
253     bdev_s, ndev_s = params
254     bdev = objects.Disk.FromDict(bdev_s)
255     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
256     if bdev is None or ndevs.count(None) > 0:
257       raise ValueError("can't unserialize data!")
258     return backend.BlockdevAddchildren(bdev, ndevs)
259
260   @staticmethod
261   def perspective_blockdev_removechildren(params):
262     """Remove a child from a mirror device.
263
264     This is only valid for mirror devices, of course. It's the callers
265     duty to send a correct disk, otherwise we raise an error.
266
267     """
268     bdev_s, ndev_s = params
269     bdev = objects.Disk.FromDict(bdev_s)
270     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
271     if bdev is None or ndevs.count(None) > 0:
272       raise ValueError("can't unserialize data!")
273     return backend.BlockdevRemovechildren(bdev, ndevs)
274
275   @staticmethod
276   def perspective_blockdev_getmirrorstatus(params):
277     """Return the mirror status for a list of disks.
278
279     """
280     disks = [objects.Disk.FromDict(dsk_s)
281              for dsk_s in params[0]]
282     return [status.ToDict()
283             for status in backend.BlockdevGetmirrorstatus(disks)]
284
285   @staticmethod
286   def perspective_blockdev_getmirrorstatus_multi(params):
287     """Return the mirror status for a list of disks.
288
289     """
290     (node_disks, ) = params
291
292     disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
293
294     result = []
295
296     for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
297       if success:
298         result.append((success, status.ToDict()))
299       else:
300         result.append((success, status))
301
302     return result
303
304   @staticmethod
305   def perspective_blockdev_find(params):
306     """Expose the FindBlockDevice functionality for a disk.
307
308     This will try to find but not activate a disk.
309
310     """
311     disk = objects.Disk.FromDict(params[0])
312
313     result = backend.BlockdevFind(disk)
314     if result is None:
315       return None
316
317     return result.ToDict()
318
319   @staticmethod
320   def perspective_blockdev_snapshot(params):
321     """Create a snapshot device.
322
323     Note that this is only valid for LVM disks, if we get passed
324     something else we raise an exception. The snapshot device can be
325     remove by calling the generic block device remove call.
326
327     """
328     cfbd = objects.Disk.FromDict(params[0])
329     return backend.BlockdevSnapshot(cfbd)
330
331   @staticmethod
332   def perspective_blockdev_grow(params):
333     """Grow a stack of devices.
334
335     """
336     cfbd = objects.Disk.FromDict(params[0])
337     amount = params[1]
338     dryrun = params[2]
339     return backend.BlockdevGrow(cfbd, amount, dryrun)
340
341   @staticmethod
342   def perspective_blockdev_close(params):
343     """Closes the given block devices.
344
345     """
346     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
347     return backend.BlockdevClose(params[0], disks)
348
349   @staticmethod
350   def perspective_blockdev_getsize(params):
351     """Compute the sizes of the given block devices.
352
353     """
354     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
355     return backend.BlockdevGetsize(disks)
356
357   @staticmethod
358   def perspective_blockdev_export(params):
359     """Compute the sizes of the given block devices.
360
361     """
362     disk = objects.Disk.FromDict(params[0])
363     dest_node, dest_path, cluster_name = params[1:]
364     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
365
366   # blockdev/drbd specific methods ----------
367
368   @staticmethod
369   def perspective_drbd_disconnect_net(params):
370     """Disconnects the network connection of drbd disks.
371
372     Note that this is only valid for drbd disks, so the members of the
373     disk list must all be drbd devices.
374
375     """
376     nodes_ip, disks = params
377     disks = [objects.Disk.FromDict(cf) for cf in disks]
378     return backend.DrbdDisconnectNet(nodes_ip, disks)
379
380   @staticmethod
381   def perspective_drbd_attach_net(params):
382     """Attaches the network connection of drbd disks.
383
384     Note that this is only valid for drbd disks, so the members of the
385     disk list must all be drbd devices.
386
387     """
388     nodes_ip, disks, instance_name, multimaster = params
389     disks = [objects.Disk.FromDict(cf) for cf in disks]
390     return backend.DrbdAttachNet(nodes_ip, disks,
391                                      instance_name, multimaster)
392
393   @staticmethod
394   def perspective_drbd_wait_sync(params):
395     """Wait until DRBD disks are synched.
396
397     Note that this is only valid for drbd disks, so the members of the
398     disk list must all be drbd devices.
399
400     """
401     nodes_ip, disks = params
402     disks = [objects.Disk.FromDict(cf) for cf in disks]
403     return backend.DrbdWaitSync(nodes_ip, disks)
404
405   @staticmethod
406   def perspective_drbd_helper(params):
407     """Query drbd helper.
408
409     """
410     return backend.GetDrbdUsermodeHelper()
411
412   # export/import  --------------------------
413
414   @staticmethod
415   def perspective_finalize_export(params):
416     """Expose the finalize export functionality.
417
418     """
419     instance = objects.Instance.FromDict(params[0])
420
421     snap_disks = []
422     for disk in params[1]:
423       if isinstance(disk, bool):
424         snap_disks.append(disk)
425       else:
426         snap_disks.append(objects.Disk.FromDict(disk))
427
428     return backend.FinalizeExport(instance, snap_disks)
429
430   @staticmethod
431   def perspective_export_info(params):
432     """Query information about an existing export on this node.
433
434     The given path may not contain an export, in which case we return
435     None.
436
437     """
438     path = params[0]
439     return backend.ExportInfo(path)
440
441   @staticmethod
442   def perspective_export_list(params):
443     """List the available exports on this node.
444
445     Note that as opposed to export_info, which may query data about an
446     export in any path, this only queries the standard Ganeti path
447     (constants.EXPORT_DIR).
448
449     """
450     return backend.ListExports()
451
452   @staticmethod
453   def perspective_export_remove(params):
454     """Remove an export.
455
456     """
457     export = params[0]
458     return backend.RemoveExport(export)
459
460   # block device ---------------------
461   @staticmethod
462   def perspective_bdev_sizes(params):
463     """Query the list of block devices
464
465     """
466     devices = params[0]
467     return backend.GetBlockDevSizes(devices)
468
469   # volume  --------------------------
470
471   @staticmethod
472   def perspective_lv_list(params):
473     """Query the list of logical volumes in a given volume group.
474
475     """
476     vgname = params[0]
477     return backend.GetVolumeList(vgname)
478
479   @staticmethod
480   def perspective_vg_list(params):
481     """Query the list of volume groups.
482
483     """
484     return backend.ListVolumeGroups()
485
486   # Storage --------------------------
487
488   @staticmethod
489   def perspective_storage_list(params):
490     """Get list of storage units.
491
492     """
493     (su_name, su_args, name, fields) = params
494     return storage.GetStorage(su_name, *su_args).List(name, fields)
495
496   @staticmethod
497   def perspective_storage_modify(params):
498     """Modify a storage unit.
499
500     """
501     (su_name, su_args, name, changes) = params
502     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
503
504   @staticmethod
505   def perspective_storage_execute(params):
506     """Execute an operation on a storage unit.
507
508     """
509     (su_name, su_args, name, op) = params
510     return storage.GetStorage(su_name, *su_args).Execute(name, op)
511
512   # bridge  --------------------------
513
514   @staticmethod
515   def perspective_bridges_exist(params):
516     """Check if all bridges given exist on this node.
517
518     """
519     bridges_list = params[0]
520     return backend.BridgesExist(bridges_list)
521
522   # instance  --------------------------
523
524   @staticmethod
525   def perspective_instance_os_add(params):
526     """Install an OS on a given instance.
527
528     """
529     inst_s = params[0]
530     inst = objects.Instance.FromDict(inst_s)
531     reinstall = params[1]
532     debug = params[2]
533     return backend.InstanceOsAdd(inst, reinstall, debug)
534
535   @staticmethod
536   def perspective_instance_run_rename(params):
537     """Runs the OS rename script for an instance.
538
539     """
540     inst_s, old_name, debug = params
541     inst = objects.Instance.FromDict(inst_s)
542     return backend.RunRenameInstance(inst, old_name, debug)
543
544   @staticmethod
545   def perspective_instance_shutdown(params):
546     """Shutdown an instance.
547
548     """
549     instance = objects.Instance.FromDict(params[0])
550     timeout = params[1]
551     return backend.InstanceShutdown(instance, timeout)
552
553   @staticmethod
554   def perspective_instance_start(params):
555     """Start an instance.
556
557     """
558     (instance_name, startup_paused) = params
559     instance = objects.Instance.FromDict(instance_name)
560     return backend.StartInstance(instance, startup_paused)
561
562   @staticmethod
563   def perspective_migration_info(params):
564     """Gather information about an instance to be migrated.
565
566     """
567     instance = objects.Instance.FromDict(params[0])
568     return backend.MigrationInfo(instance)
569
570   @staticmethod
571   def perspective_accept_instance(params):
572     """Prepare the node to accept an instance.
573
574     """
575     instance, info, target = params
576     instance = objects.Instance.FromDict(instance)
577     return backend.AcceptInstance(instance, info, target)
578
579   @staticmethod
580   def perspective_instance_finalize_migration_dst(params):
581     """Finalize the instance migration on the destination node.
582
583     """
584     instance, info, success = params
585     instance = objects.Instance.FromDict(instance)
586     return backend.FinalizeMigrationDst(instance, info, success)
587
588   @staticmethod
589   def perspective_instance_migrate(params):
590     """Migrates an instance.
591
592     """
593     instance, target, live = params
594     instance = objects.Instance.FromDict(instance)
595     return backend.MigrateInstance(instance, target, live)
596
597   @staticmethod
598   def perspective_instance_finalize_migration_src(params):
599     """Finalize the instance migration on the source node.
600
601     """
602     instance, success, live = params
603     instance = objects.Instance.FromDict(instance)
604     return backend.FinalizeMigrationSource(instance, success, live)
605
606   @staticmethod
607   def perspective_instance_get_migration_status(params):
608     """Reports migration status.
609
610     """
611     instance = objects.Instance.FromDict(params[0])
612     return backend.GetMigrationStatus(instance).ToDict()
613
614   @staticmethod
615   def perspective_instance_reboot(params):
616     """Reboot an instance.
617
618     """
619     instance = objects.Instance.FromDict(params[0])
620     reboot_type = params[1]
621     shutdown_timeout = params[2]
622     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
623
624   @staticmethod
625   def perspective_instance_info(params):
626     """Query instance information.
627
628     """
629     return backend.GetInstanceInfo(params[0], params[1])
630
631   @staticmethod
632   def perspective_instance_migratable(params):
633     """Query whether the specified instance can be migrated.
634
635     """
636     instance = objects.Instance.FromDict(params[0])
637     return backend.GetInstanceMigratable(instance)
638
639   @staticmethod
640   def perspective_all_instances_info(params):
641     """Query information about all instances.
642
643     """
644     return backend.GetAllInstancesInfo(params[0])
645
646   @staticmethod
647   def perspective_instance_list(params):
648     """Query the list of running instances.
649
650     """
651     return backend.GetInstanceList(params[0])
652
653   # node --------------------------
654
655   @staticmethod
656   def perspective_node_has_ip_address(params):
657     """Checks if a node has the given ip address.
658
659     """
660     return netutils.IPAddress.Own(params[0])
661
662   @staticmethod
663   def perspective_node_info(params):
664     """Query node information.
665
666     """
667     (vg_names, hv_names) = params
668     return backend.GetNodeInfo(vg_names, hv_names)
669
670   @staticmethod
671   def perspective_etc_hosts_modify(params):
672     """Modify a node entry in /etc/hosts.
673
674     """
675     backend.EtcHostsModify(params[0], params[1], params[2])
676
677     return True
678
679   @staticmethod
680   def perspective_node_verify(params):
681     """Run a verify sequence on this node.
682
683     """
684     return backend.VerifyNode(params[0], params[1])
685
686   @staticmethod
687   def perspective_node_start_master_daemons(params):
688     """Start the master daemons on this node.
689
690     """
691     return backend.StartMasterDaemons(params[0])
692
693   @staticmethod
694   def perspective_node_activate_master_ip(params):
695     """Activate the master IP on this node.
696
697     """
698     master_params = objects.MasterNetworkParameters.FromDict(params[0])
699     return backend.ActivateMasterIp(master_params, params[1])
700
701   @staticmethod
702   def perspective_node_deactivate_master_ip(params):
703     """Deactivate the master IP on this node.
704
705     """
706     master_params = objects.MasterNetworkParameters.FromDict(params[0])
707     return backend.DeactivateMasterIp(master_params, params[1])
708
709   @staticmethod
710   def perspective_node_stop_master(params):
711     """Stops master daemons on this node.
712
713     """
714     return backend.StopMasterDaemons()
715
716   @staticmethod
717   def perspective_node_change_master_netmask(params):
718     """Change the master IP netmask.
719
720     """
721     return backend.ChangeMasterNetmask(params[0], params[1], params[2],
722                                        params[3])
723
724   @staticmethod
725   def perspective_node_leave_cluster(params):
726     """Cleanup after leaving a cluster.
727
728     """
729     return backend.LeaveCluster(params[0])
730
731   @staticmethod
732   def perspective_node_volumes(params):
733     """Query the list of all logical volume groups.
734
735     """
736     return backend.NodeVolumes()
737
738   @staticmethod
739   def perspective_node_demote_from_mc(params):
740     """Demote a node from the master candidate role.
741
742     """
743     return backend.DemoteFromMC()
744
745   @staticmethod
746   def perspective_node_powercycle(params):
747     """Tries to powercycle the nod.
748
749     """
750     hypervisor_type = params[0]
751     return backend.PowercycleNode(hypervisor_type)
752
753   # cluster --------------------------
754
755   @staticmethod
756   def perspective_version(params):
757     """Query version information.
758
759     """
760     return constants.PROTOCOL_VERSION
761
762   @staticmethod
763   def perspective_upload_file(params):
764     """Upload a file.
765
766     Note that the backend implementation imposes strict rules on which
767     files are accepted.
768
769     """
770     return backend.UploadFile(*(params[0]))
771
772   @staticmethod
773   def perspective_master_info(params):
774     """Query master information.
775
776     """
777     return backend.GetMasterInfo()
778
779   @staticmethod
780   def perspective_run_oob(params):
781     """Runs oob on node.
782
783     """
784     output = backend.RunOob(params[0], params[1], params[2], params[3])
785     if output:
786       result = serializer.LoadJson(output)
787     else:
788       result = None
789     return result
790
791   @staticmethod
792   def perspective_write_ssconf_files(params):
793     """Write ssconf files.
794
795     """
796     (values,) = params
797     return backend.WriteSsconfFiles(values)
798
799   # os -----------------------
800
801   @staticmethod
802   def perspective_os_diagnose(params):
803     """Query detailed information about existing OSes.
804
805     """
806     return backend.DiagnoseOS()
807
808   @staticmethod
809   def perspective_os_get(params):
810     """Query information about a given OS.
811
812     """
813     name = params[0]
814     os_obj = backend.OSFromDisk(name)
815     return os_obj.ToDict()
816
817   @staticmethod
818   def perspective_os_validate(params):
819     """Run a given OS' validation routine.
820
821     """
822     required, name, checks, params = params
823     return backend.ValidateOS(required, name, checks, params)
824
825   # hooks -----------------------
826
827   @staticmethod
828   def perspective_hooks_runner(params):
829     """Run hook scripts.
830
831     """
832     hpath, phase, env = params
833     hr = backend.HooksRunner()
834     return hr.RunHooks(hpath, phase, env)
835
836   # iallocator -----------------
837
838   @staticmethod
839   def perspective_iallocator_runner(params):
840     """Run an iallocator script.
841
842     """
843     name, idata = params
844     iar = backend.IAllocatorRunner()
845     return iar.Run(name, idata)
846
847   # test -----------------------
848
849   @staticmethod
850   def perspective_test_delay(params):
851     """Run test delay.
852
853     """
854     duration = params[0]
855     status, rval = utils.TestDelay(duration)
856     if not status:
857       raise backend.RPCFail(rval)
858     return rval
859
860   # file storage ---------------
861
862   @staticmethod
863   def perspective_file_storage_dir_create(params):
864     """Create the file storage directory.
865
866     """
867     file_storage_dir = params[0]
868     return backend.CreateFileStorageDir(file_storage_dir)
869
870   @staticmethod
871   def perspective_file_storage_dir_remove(params):
872     """Remove the file storage directory.
873
874     """
875     file_storage_dir = params[0]
876     return backend.RemoveFileStorageDir(file_storage_dir)
877
878   @staticmethod
879   def perspective_file_storage_dir_rename(params):
880     """Rename the file storage directory.
881
882     """
883     old_file_storage_dir = params[0]
884     new_file_storage_dir = params[1]
885     return backend.RenameFileStorageDir(old_file_storage_dir,
886                                         new_file_storage_dir)
887
888   # jobs ------------------------
889
890   @staticmethod
891   @_RequireJobQueueLock
892   def perspective_jobqueue_update(params):
893     """Update job queue.
894
895     """
896     (file_name, content) = params
897     return backend.JobQueueUpdate(file_name, content)
898
899   @staticmethod
900   @_RequireJobQueueLock
901   def perspective_jobqueue_purge(params):
902     """Purge job queue.
903
904     """
905     return backend.JobQueuePurge()
906
907   @staticmethod
908   @_RequireJobQueueLock
909   def perspective_jobqueue_rename(params):
910     """Rename a job queue file.
911
912     """
913     # TODO: What if a file fails to rename?
914     return [backend.JobQueueRename(old, new) for old, new in params[0]]
915
916   # hypervisor ---------------
917
918   @staticmethod
919   def perspective_hypervisor_validate_params(params):
920     """Validate the hypervisor parameters.
921
922     """
923     (hvname, hvparams) = params
924     return backend.ValidateHVParams(hvname, hvparams)
925
926   # Crypto
927
928   @staticmethod
929   def perspective_x509_cert_create(params):
930     """Creates a new X509 certificate for SSL/TLS.
931
932     """
933     (validity, ) = params
934     return backend.CreateX509Certificate(validity)
935
936   @staticmethod
937   def perspective_x509_cert_remove(params):
938     """Removes a X509 certificate.
939
940     """
941     (name, ) = params
942     return backend.RemoveX509Certificate(name)
943
944   # Import and export
945
946   @staticmethod
947   def perspective_import_start(params):
948     """Starts an import daemon.
949
950     """
951     (opts_s, instance, component, (dest, dest_args)) = params
952
953     opts = objects.ImportExportOptions.FromDict(opts_s)
954
955     return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
956                                            None, None,
957                                            objects.Instance.FromDict(instance),
958                                            component, dest,
959                                            _DecodeImportExportIO(dest,
960                                                                  dest_args))
961
962   @staticmethod
963   def perspective_export_start(params):
964     """Starts an export daemon.
965
966     """
967     (opts_s, host, port, instance, component, (source, source_args)) = params
968
969     opts = objects.ImportExportOptions.FromDict(opts_s)
970
971     return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
972                                            host, port,
973                                            objects.Instance.FromDict(instance),
974                                            component, source,
975                                            _DecodeImportExportIO(source,
976                                                                  source_args))
977
978   @staticmethod
979   def perspective_impexp_status(params):
980     """Retrieves the status of an import or export daemon.
981
982     """
983     return backend.GetImportExportStatus(params[0])
984
985   @staticmethod
986   def perspective_impexp_abort(params):
987     """Aborts an import or export.
988
989     """
990     return backend.AbortImportExport(params[0])
991
992   @staticmethod
993   def perspective_impexp_cleanup(params):
994     """Cleans up after an import or export.
995
996     """
997     return backend.CleanupImportExport(params[0])
998
999
1000 def CheckNoded(_, args):
1001   """Initial checks whether to run or exit with a failure.
1002
1003   """
1004   if args: # noded doesn't take any arguments
1005     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1006                           sys.argv[0])
1007     sys.exit(constants.EXIT_FAILURE)
1008   try:
1009     codecs.lookup("string-escape")
1010   except LookupError:
1011     print >> sys.stderr, ("Can't load the string-escape code which is part"
1012                           " of the Python installation. Is your installation"
1013                           " complete/correct? Aborting.")
1014     sys.exit(constants.EXIT_FAILURE)
1015
1016
1017 def PrepNoded(options, _):
1018   """Preparation node daemon function, executed with the PID file held.
1019
1020   """
1021   if options.mlock:
1022     request_executor_class = MlockallRequestExecutor
1023     try:
1024       utils.Mlockall()
1025     except errors.NoCtypesError:
1026       logging.warning("Cannot set memory lock, ctypes module not found")
1027       request_executor_class = http.server.HttpServerRequestExecutor
1028   else:
1029     request_executor_class = http.server.HttpServerRequestExecutor
1030
1031   # Read SSL certificate
1032   if options.ssl:
1033     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1034                                     ssl_cert_path=options.ssl_cert)
1035   else:
1036     ssl_params = None
1037
1038   err = _PrepareQueueLock()
1039   if err is not None:
1040     # this might be some kind of file-system/permission error; while
1041     # this breaks the job queue functionality, we shouldn't prevent
1042     # startup of the whole node daemon because of this
1043     logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1044
1045   mainloop = daemon.Mainloop()
1046   server = NodeHttpServer(mainloop, options.bind_address, options.port,
1047                           ssl_params=ssl_params, ssl_verify_peer=True,
1048                           request_executor_class=request_executor_class)
1049   server.Start()
1050   return (mainloop, server)
1051
1052
1053 def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1054   """Main node daemon function, executed with the PID file held.
1055
1056   """
1057   (mainloop, server) = prep_data
1058   try:
1059     mainloop.Run()
1060   finally:
1061     server.Stop()
1062
1063
1064 def Main():
1065   """Main function for the node daemon.
1066
1067   """
1068   parser = OptionParser(description="Ganeti node daemon",
1069                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
1070                         version="%%prog (ganeti) %s" %
1071                         constants.RELEASE_VERSION)
1072   parser.add_option("--no-mlock", dest="mlock",
1073                     help="Do not mlock the node memory in ram",
1074                     default=True, action="store_false")
1075
1076   daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1077                      default_ssl_cert=constants.NODED_CERT_FILE,
1078                      default_ssl_key=constants.NODED_CERT_FILE,
1079                      console_logging=True)