Fix iallocator group objects
[ganeti-local] / lib / server / noded.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36 import codecs
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti import storage
49 from ganeti import serializer
50 from ganeti import netutils
51
52 import ganeti.http.server # pylint: disable=W0611
53
54
55 queue_lock = None
56
57
58 def _PrepareQueueLock():
59   """Try to prepare the queue lock.
60
61   @return: None for success, otherwise an exception object
62
63   """
64   global queue_lock # pylint: disable=W0603
65
66   if queue_lock is not None:
67     return None
68
69   # Prepare job queue
70   try:
71     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
72     return None
73   except EnvironmentError, err:
74     return err
75
76
77 def _RequireJobQueueLock(fn):
78   """Decorator for job queue manipulating functions.
79
80   """
81   QUEUE_LOCK_TIMEOUT = 10
82
83   def wrapper(*args, **kwargs):
84     # Locking in exclusive, blocking mode because there could be several
85     # children running at the same time. Waiting up to 10 seconds.
86     if _PrepareQueueLock() is not None:
87       raise errors.JobQueueError("Job queue failed initialization,"
88                                  " cannot update jobs")
89     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
90     try:
91       return fn(*args, **kwargs)
92     finally:
93       queue_lock.Unlock()
94
95   return wrapper
96
97
98 def _DecodeImportExportIO(ieio, ieioargs):
99   """Decodes import/export I/O information.
100
101   """
102   if ieio == constants.IEIO_RAW_DISK:
103     assert len(ieioargs) == 1
104     return (objects.Disk.FromDict(ieioargs[0]), )
105
106   if ieio == constants.IEIO_SCRIPT:
107     assert len(ieioargs) == 2
108     return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
109
110   return ieioargs
111
112
113 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
114   """Custom Request Executor class that ensures NodeHttpServer children are
115   locked in ram.
116
117   """
118   def __init__(self, *args, **kwargs):
119     utils.Mlockall()
120
121     http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
122
123
124 class NodeHttpServer(http.server.HttpServer):
125   """The server implementation.
126
127   This class holds all methods exposed over the RPC interface.
128
129   """
130   # too many public methods, and unused args - all methods get params
131   # due to the API
132   # pylint: disable=R0904,W0613
133   def __init__(self, *args, **kwargs):
134     http.server.HttpServer.__init__(self, *args, **kwargs)
135     self.noded_pid = os.getpid()
136
137   def HandleRequest(self, req):
138     """Handle a request.
139
140     """
141     # FIXME: Remove HTTP_PUT in Ganeti 2.7
142     if req.request_method.upper() not in (http.HTTP_PUT, http.HTTP_POST):
143       raise http.HttpBadRequest("Only PUT and POST methods are supported")
144
145     path = req.request_path
146     if path.startswith("/"):
147       path = path[1:]
148
149     method = getattr(self, "perspective_%s" % path, None)
150     if method is None:
151       raise http.HttpNotFound()
152
153     try:
154       result = (True, method(serializer.LoadJson(req.request_body)))
155
156     except backend.RPCFail, err:
157       # our custom failure exception; str(err) works fine if the
158       # exception was constructed with a single argument, and in
159       # this case, err.message == err.args[0] == str(err)
160       result = (False, str(err))
161     except errors.QuitGanetiException, err:
162       # Tell parent to quit
163       logging.info("Shutting down the node daemon, arguments: %s",
164                    str(err.args))
165       os.kill(self.noded_pid, signal.SIGTERM)
166       # And return the error's arguments, which must be already in
167       # correct tuple format
168       result = err.args
169     except Exception, err:
170       logging.exception("Error in RPC call")
171       result = (False, "Error while executing backend function: %s" % str(err))
172
173     return serializer.DumpJson(result)
174
175   # the new block devices  --------------------------
176
177   @staticmethod
178   def perspective_blockdev_create(params):
179     """Create a block device.
180
181     """
182     bdev_s, size, owner, on_primary, info = params
183     bdev = objects.Disk.FromDict(bdev_s)
184     if bdev is None:
185       raise ValueError("can't unserialize data!")
186     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
187
188   @staticmethod
189   def perspective_blockdev_pause_resume_sync(params):
190     """Pause/resume sync of a block device.
191
192     """
193     disks_s, pause = params
194     disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
195     return backend.BlockdevPauseResumeSync(disks, pause)
196
197   @staticmethod
198   def perspective_blockdev_wipe(params):
199     """Wipe a block device.
200
201     """
202     bdev_s, offset, size = params
203     bdev = objects.Disk.FromDict(bdev_s)
204     return backend.BlockdevWipe(bdev, offset, size)
205
206   @staticmethod
207   def perspective_blockdev_remove(params):
208     """Remove a block device.
209
210     """
211     bdev_s = params[0]
212     bdev = objects.Disk.FromDict(bdev_s)
213     return backend.BlockdevRemove(bdev)
214
215   @staticmethod
216   def perspective_blockdev_rename(params):
217     """Remove a block device.
218
219     """
220     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]]
221     return backend.BlockdevRename(devlist)
222
223   @staticmethod
224   def perspective_blockdev_assemble(params):
225     """Assemble a block device.
226
227     """
228     bdev_s, owner, on_primary, idx = params
229     bdev = objects.Disk.FromDict(bdev_s)
230     if bdev is None:
231       raise ValueError("can't unserialize data!")
232     return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
233
234   @staticmethod
235   def perspective_blockdev_shutdown(params):
236     """Shutdown a block device.
237
238     """
239     bdev_s = params[0]
240     bdev = objects.Disk.FromDict(bdev_s)
241     if bdev is None:
242       raise ValueError("can't unserialize data!")
243     return backend.BlockdevShutdown(bdev)
244
245   @staticmethod
246   def perspective_blockdev_addchildren(params):
247     """Add a child to a mirror device.
248
249     Note: this is only valid for mirror devices. It's the caller's duty
250     to send a correct disk, otherwise we raise an error.
251
252     """
253     bdev_s, ndev_s = params
254     bdev = objects.Disk.FromDict(bdev_s)
255     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
256     if bdev is None or ndevs.count(None) > 0:
257       raise ValueError("can't unserialize data!")
258     return backend.BlockdevAddchildren(bdev, ndevs)
259
260   @staticmethod
261   def perspective_blockdev_removechildren(params):
262     """Remove a child from a mirror device.
263
264     This is only valid for mirror devices, of course. It's the callers
265     duty to send a correct disk, otherwise we raise an error.
266
267     """
268     bdev_s, ndev_s = params
269     bdev = objects.Disk.FromDict(bdev_s)
270     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
271     if bdev is None or ndevs.count(None) > 0:
272       raise ValueError("can't unserialize data!")
273     return backend.BlockdevRemovechildren(bdev, ndevs)
274
275   @staticmethod
276   def perspective_blockdev_getmirrorstatus(params):
277     """Return the mirror status for a list of disks.
278
279     """
280     disks = [objects.Disk.FromDict(dsk_s)
281              for dsk_s in params[0]]
282     return [status.ToDict()
283             for status in backend.BlockdevGetmirrorstatus(disks)]
284
285   @staticmethod
286   def perspective_blockdev_getmirrorstatus_multi(params):
287     """Return the mirror status for a list of disks.
288
289     """
290     (node_disks, ) = params
291
292     disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
293
294     result = []
295
296     for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
297       if success:
298         result.append((success, status.ToDict()))
299       else:
300         result.append((success, status))
301
302     return result
303
304   @staticmethod
305   def perspective_blockdev_find(params):
306     """Expose the FindBlockDevice functionality for a disk.
307
308     This will try to find but not activate a disk.
309
310     """
311     disk = objects.Disk.FromDict(params[0])
312
313     result = backend.BlockdevFind(disk)
314     if result is None:
315       return None
316
317     return result.ToDict()
318
319   @staticmethod
320   def perspective_blockdev_snapshot(params):
321     """Create a snapshot device.
322
323     Note that this is only valid for LVM disks, if we get passed
324     something else we raise an exception. The snapshot device can be
325     remove by calling the generic block device remove call.
326
327     """
328     cfbd = objects.Disk.FromDict(params[0])
329     return backend.BlockdevSnapshot(cfbd)
330
331   @staticmethod
332   def perspective_blockdev_grow(params):
333     """Grow a stack of devices.
334
335     """
336     cfbd = objects.Disk.FromDict(params[0])
337     amount = params[1]
338     dryrun = params[2]
339     return backend.BlockdevGrow(cfbd, amount, dryrun)
340
341   @staticmethod
342   def perspective_blockdev_close(params):
343     """Closes the given block devices.
344
345     """
346     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
347     return backend.BlockdevClose(params[0], disks)
348
349   @staticmethod
350   def perspective_blockdev_getsize(params):
351     """Compute the sizes of the given block devices.
352
353     """
354     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
355     return backend.BlockdevGetsize(disks)
356
357   @staticmethod
358   def perspective_blockdev_export(params):
359     """Compute the sizes of the given block devices.
360
361     """
362     disk = objects.Disk.FromDict(params[0])
363     dest_node, dest_path, cluster_name = params[1:]
364     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
365
366   # blockdev/drbd specific methods ----------
367
368   @staticmethod
369   def perspective_drbd_disconnect_net(params):
370     """Disconnects the network connection of drbd disks.
371
372     Note that this is only valid for drbd disks, so the members of the
373     disk list must all be drbd devices.
374
375     """
376     nodes_ip, disks = params
377     disks = [objects.Disk.FromDict(cf) for cf in disks]
378     return backend.DrbdDisconnectNet(nodes_ip, disks)
379
380   @staticmethod
381   def perspective_drbd_attach_net(params):
382     """Attaches the network connection of drbd disks.
383
384     Note that this is only valid for drbd disks, so the members of the
385     disk list must all be drbd devices.
386
387     """
388     nodes_ip, disks, instance_name, multimaster = params
389     disks = [objects.Disk.FromDict(cf) for cf in disks]
390     return backend.DrbdAttachNet(nodes_ip, disks,
391                                      instance_name, multimaster)
392
393   @staticmethod
394   def perspective_drbd_wait_sync(params):
395     """Wait until DRBD disks are synched.
396
397     Note that this is only valid for drbd disks, so the members of the
398     disk list must all be drbd devices.
399
400     """
401     nodes_ip, disks = params
402     disks = [objects.Disk.FromDict(cf) for cf in disks]
403     return backend.DrbdWaitSync(nodes_ip, disks)
404
405   @staticmethod
406   def perspective_drbd_helper(params):
407     """Query drbd helper.
408
409     """
410     return backend.GetDrbdUsermodeHelper()
411
412   # export/import  --------------------------
413
414   @staticmethod
415   def perspective_finalize_export(params):
416     """Expose the finalize export functionality.
417
418     """
419     instance = objects.Instance.FromDict(params[0])
420
421     snap_disks = []
422     for disk in params[1]:
423       if isinstance(disk, bool):
424         snap_disks.append(disk)
425       else:
426         snap_disks.append(objects.Disk.FromDict(disk))
427
428     return backend.FinalizeExport(instance, snap_disks)
429
430   @staticmethod
431   def perspective_export_info(params):
432     """Query information about an existing export on this node.
433
434     The given path may not contain an export, in which case we return
435     None.
436
437     """
438     path = params[0]
439     return backend.ExportInfo(path)
440
441   @staticmethod
442   def perspective_export_list(params):
443     """List the available exports on this node.
444
445     Note that as opposed to export_info, which may query data about an
446     export in any path, this only queries the standard Ganeti path
447     (constants.EXPORT_DIR).
448
449     """
450     return backend.ListExports()
451
452   @staticmethod
453   def perspective_export_remove(params):
454     """Remove an export.
455
456     """
457     export = params[0]
458     return backend.RemoveExport(export)
459
460   # block device ---------------------
461   @staticmethod
462   def perspective_bdev_sizes(params):
463     """Query the list of block devices
464
465     """
466     devices = params[0]
467     return backend.GetBlockDevSizes(devices)
468
469   # volume  --------------------------
470
471   @staticmethod
472   def perspective_lv_list(params):
473     """Query the list of logical volumes in a given volume group.
474
475     """
476     vgname = params[0]
477     return backend.GetVolumeList(vgname)
478
479   @staticmethod
480   def perspective_vg_list(params):
481     """Query the list of volume groups.
482
483     """
484     return backend.ListVolumeGroups()
485
486   # Storage --------------------------
487
488   @staticmethod
489   def perspective_storage_list(params):
490     """Get list of storage units.
491
492     """
493     (su_name, su_args, name, fields) = params
494     return storage.GetStorage(su_name, *su_args).List(name, fields)
495
496   @staticmethod
497   def perspective_storage_modify(params):
498     """Modify a storage unit.
499
500     """
501     (su_name, su_args, name, changes) = params
502     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
503
504   @staticmethod
505   def perspective_storage_execute(params):
506     """Execute an operation on a storage unit.
507
508     """
509     (su_name, su_args, name, op) = params
510     return storage.GetStorage(su_name, *su_args).Execute(name, op)
511
512   # bridge  --------------------------
513
514   @staticmethod
515   def perspective_bridges_exist(params):
516     """Check if all bridges given exist on this node.
517
518     """
519     bridges_list = params[0]
520     return backend.BridgesExist(bridges_list)
521
522   # instance  --------------------------
523
524   @staticmethod
525   def perspective_instance_os_add(params):
526     """Install an OS on a given instance.
527
528     """
529     inst_s = params[0]
530     inst = objects.Instance.FromDict(inst_s)
531     reinstall = params[1]
532     debug = params[2]
533     return backend.InstanceOsAdd(inst, reinstall, debug)
534
535   @staticmethod
536   def perspective_instance_run_rename(params):
537     """Runs the OS rename script for an instance.
538
539     """
540     inst_s, old_name, debug = params
541     inst = objects.Instance.FromDict(inst_s)
542     return backend.RunRenameInstance(inst, old_name, debug)
543
544   @staticmethod
545   def perspective_instance_shutdown(params):
546     """Shutdown an instance.
547
548     """
549     instance = objects.Instance.FromDict(params[0])
550     timeout = params[1]
551     return backend.InstanceShutdown(instance, timeout)
552
553   @staticmethod
554   def perspective_instance_start(params):
555     """Start an instance.
556
557     """
558     (instance_name, startup_paused) = params
559     instance = objects.Instance.FromDict(instance_name)
560     return backend.StartInstance(instance, startup_paused)
561
562   @staticmethod
563   def perspective_migration_info(params):
564     """Gather information about an instance to be migrated.
565
566     """
567     instance = objects.Instance.FromDict(params[0])
568     return backend.MigrationInfo(instance)
569
570   @staticmethod
571   def perspective_accept_instance(params):
572     """Prepare the node to accept an instance.
573
574     """
575     instance, info, target = params
576     instance = objects.Instance.FromDict(instance)
577     return backend.AcceptInstance(instance, info, target)
578
579   @staticmethod
580   def perspective_instance_finalize_migration_dst(params):
581     """Finalize the instance migration on the destination node.
582
583     """
584     instance, info, success = params
585     instance = objects.Instance.FromDict(instance)
586     return backend.FinalizeMigrationDst(instance, info, success)
587
588   @staticmethod
589   def perspective_instance_migrate(params):
590     """Migrates an instance.
591
592     """
593     instance, target, live = params
594     instance = objects.Instance.FromDict(instance)
595     return backend.MigrateInstance(instance, target, live)
596
597   @staticmethod
598   def perspective_instance_finalize_migration_src(params):
599     """Finalize the instance migration on the source node.
600
601     """
602     instance, success, live = params
603     instance = objects.Instance.FromDict(instance)
604     return backend.FinalizeMigrationSource(instance, success, live)
605
606   @staticmethod
607   def perspective_instance_get_migration_status(params):
608     """Reports migration status.
609
610     """
611     instance = objects.Instance.FromDict(params[0])
612     return backend.GetMigrationStatus(instance).ToDict()
613
614   @staticmethod
615   def perspective_instance_reboot(params):
616     """Reboot an instance.
617
618     """
619     instance = objects.Instance.FromDict(params[0])
620     reboot_type = params[1]
621     shutdown_timeout = params[2]
622     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
623
624   @staticmethod
625   def perspective_instance_balloon_memory(params):
626     """Modify instance runtime memory.
627
628     """
629     instance_dict, memory = params
630     instance = objects.Instance.FromDict(instance_dict)
631     return backend.InstanceBalloonMemory(instance, memory)
632
633   @staticmethod
634   def perspective_instance_info(params):
635     """Query instance information.
636
637     """
638     return backend.GetInstanceInfo(params[0], params[1])
639
640   @staticmethod
641   def perspective_instance_migratable(params):
642     """Query whether the specified instance can be migrated.
643
644     """
645     instance = objects.Instance.FromDict(params[0])
646     return backend.GetInstanceMigratable(instance)
647
648   @staticmethod
649   def perspective_all_instances_info(params):
650     """Query information about all instances.
651
652     """
653     return backend.GetAllInstancesInfo(params[0])
654
655   @staticmethod
656   def perspective_instance_list(params):
657     """Query the list of running instances.
658
659     """
660     return backend.GetInstanceList(params[0])
661
662   # node --------------------------
663
664   @staticmethod
665   def perspective_node_has_ip_address(params):
666     """Checks if a node has the given ip address.
667
668     """
669     return netutils.IPAddress.Own(params[0])
670
671   @staticmethod
672   def perspective_node_info(params):
673     """Query node information.
674
675     """
676     (vg_names, hv_names) = params
677     return backend.GetNodeInfo(vg_names, hv_names)
678
679   @staticmethod
680   def perspective_etc_hosts_modify(params):
681     """Modify a node entry in /etc/hosts.
682
683     """
684     backend.EtcHostsModify(params[0], params[1], params[2])
685
686     return True
687
688   @staticmethod
689   def perspective_node_verify(params):
690     """Run a verify sequence on this node.
691
692     """
693     return backend.VerifyNode(params[0], params[1])
694
695   @staticmethod
696   def perspective_node_start_master_daemons(params):
697     """Start the master daemons on this node.
698
699     """
700     return backend.StartMasterDaemons(params[0])
701
702   @staticmethod
703   def perspective_node_activate_master_ip(params):
704     """Activate the master IP on this node.
705
706     """
707     master_params = objects.MasterNetworkParameters.FromDict(params[0])
708     return backend.ActivateMasterIp(master_params, params[1])
709
710   @staticmethod
711   def perspective_node_deactivate_master_ip(params):
712     """Deactivate the master IP on this node.
713
714     """
715     master_params = objects.MasterNetworkParameters.FromDict(params[0])
716     return backend.DeactivateMasterIp(master_params, params[1])
717
718   @staticmethod
719   def perspective_node_stop_master(params):
720     """Stops master daemons on this node.
721
722     """
723     return backend.StopMasterDaemons()
724
725   @staticmethod
726   def perspective_node_change_master_netmask(params):
727     """Change the master IP netmask.
728
729     """
730     return backend.ChangeMasterNetmask(params[0], params[1], params[2],
731                                        params[3])
732
733   @staticmethod
734   def perspective_node_leave_cluster(params):
735     """Cleanup after leaving a cluster.
736
737     """
738     return backend.LeaveCluster(params[0])
739
740   @staticmethod
741   def perspective_node_volumes(params):
742     """Query the list of all logical volume groups.
743
744     """
745     return backend.NodeVolumes()
746
747   @staticmethod
748   def perspective_node_demote_from_mc(params):
749     """Demote a node from the master candidate role.
750
751     """
752     return backend.DemoteFromMC()
753
754   @staticmethod
755   def perspective_node_powercycle(params):
756     """Tries to powercycle the nod.
757
758     """
759     hypervisor_type = params[0]
760     return backend.PowercycleNode(hypervisor_type)
761
762   # cluster --------------------------
763
764   @staticmethod
765   def perspective_version(params):
766     """Query version information.
767
768     """
769     return constants.PROTOCOL_VERSION
770
771   @staticmethod
772   def perspective_upload_file(params):
773     """Upload a file.
774
775     Note that the backend implementation imposes strict rules on which
776     files are accepted.
777
778     """
779     return backend.UploadFile(*(params[0]))
780
781   @staticmethod
782   def perspective_master_info(params):
783     """Query master information.
784
785     """
786     return backend.GetMasterInfo()
787
788   @staticmethod
789   def perspective_run_oob(params):
790     """Runs oob on node.
791
792     """
793     output = backend.RunOob(params[0], params[1], params[2], params[3])
794     if output:
795       result = serializer.LoadJson(output)
796     else:
797       result = None
798     return result
799
800   @staticmethod
801   def perspective_write_ssconf_files(params):
802     """Write ssconf files.
803
804     """
805     (values,) = params
806     return backend.WriteSsconfFiles(values)
807
808   # os -----------------------
809
810   @staticmethod
811   def perspective_os_diagnose(params):
812     """Query detailed information about existing OSes.
813
814     """
815     return backend.DiagnoseOS()
816
817   @staticmethod
818   def perspective_os_get(params):
819     """Query information about a given OS.
820
821     """
822     name = params[0]
823     os_obj = backend.OSFromDisk(name)
824     return os_obj.ToDict()
825
826   @staticmethod
827   def perspective_os_validate(params):
828     """Run a given OS' validation routine.
829
830     """
831     required, name, checks, params = params
832     return backend.ValidateOS(required, name, checks, params)
833
834   # hooks -----------------------
835
836   @staticmethod
837   def perspective_hooks_runner(params):
838     """Run hook scripts.
839
840     """
841     hpath, phase, env = params
842     hr = backend.HooksRunner()
843     return hr.RunHooks(hpath, phase, env)
844
845   # iallocator -----------------
846
847   @staticmethod
848   def perspective_iallocator_runner(params):
849     """Run an iallocator script.
850
851     """
852     name, idata = params
853     iar = backend.IAllocatorRunner()
854     return iar.Run(name, idata)
855
856   # test -----------------------
857
858   @staticmethod
859   def perspective_test_delay(params):
860     """Run test delay.
861
862     """
863     duration = params[0]
864     status, rval = utils.TestDelay(duration)
865     if not status:
866       raise backend.RPCFail(rval)
867     return rval
868
869   # file storage ---------------
870
871   @staticmethod
872   def perspective_file_storage_dir_create(params):
873     """Create the file storage directory.
874
875     """
876     file_storage_dir = params[0]
877     return backend.CreateFileStorageDir(file_storage_dir)
878
879   @staticmethod
880   def perspective_file_storage_dir_remove(params):
881     """Remove the file storage directory.
882
883     """
884     file_storage_dir = params[0]
885     return backend.RemoveFileStorageDir(file_storage_dir)
886
887   @staticmethod
888   def perspective_file_storage_dir_rename(params):
889     """Rename the file storage directory.
890
891     """
892     old_file_storage_dir = params[0]
893     new_file_storage_dir = params[1]
894     return backend.RenameFileStorageDir(old_file_storage_dir,
895                                         new_file_storage_dir)
896
897   # jobs ------------------------
898
899   @staticmethod
900   @_RequireJobQueueLock
901   def perspective_jobqueue_update(params):
902     """Update job queue.
903
904     """
905     (file_name, content) = params
906     return backend.JobQueueUpdate(file_name, content)
907
908   @staticmethod
909   @_RequireJobQueueLock
910   def perspective_jobqueue_purge(params):
911     """Purge job queue.
912
913     """
914     return backend.JobQueuePurge()
915
916   @staticmethod
917   @_RequireJobQueueLock
918   def perspective_jobqueue_rename(params):
919     """Rename a job queue file.
920
921     """
922     # TODO: What if a file fails to rename?
923     return [backend.JobQueueRename(old, new) for old, new in params[0]]
924
925   # hypervisor ---------------
926
927   @staticmethod
928   def perspective_hypervisor_validate_params(params):
929     """Validate the hypervisor parameters.
930
931     """
932     (hvname, hvparams) = params
933     return backend.ValidateHVParams(hvname, hvparams)
934
935   # Crypto
936
937   @staticmethod
938   def perspective_x509_cert_create(params):
939     """Creates a new X509 certificate for SSL/TLS.
940
941     """
942     (validity, ) = params
943     return backend.CreateX509Certificate(validity)
944
945   @staticmethod
946   def perspective_x509_cert_remove(params):
947     """Removes a X509 certificate.
948
949     """
950     (name, ) = params
951     return backend.RemoveX509Certificate(name)
952
953   # Import and export
954
955   @staticmethod
956   def perspective_import_start(params):
957     """Starts an import daemon.
958
959     """
960     (opts_s, instance, component, (dest, dest_args)) = params
961
962     opts = objects.ImportExportOptions.FromDict(opts_s)
963
964     return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
965                                            None, None,
966                                            objects.Instance.FromDict(instance),
967                                            component, dest,
968                                            _DecodeImportExportIO(dest,
969                                                                  dest_args))
970
971   @staticmethod
972   def perspective_export_start(params):
973     """Starts an export daemon.
974
975     """
976     (opts_s, host, port, instance, component, (source, source_args)) = params
977
978     opts = objects.ImportExportOptions.FromDict(opts_s)
979
980     return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
981                                            host, port,
982                                            objects.Instance.FromDict(instance),
983                                            component, source,
984                                            _DecodeImportExportIO(source,
985                                                                  source_args))
986
987   @staticmethod
988   def perspective_impexp_status(params):
989     """Retrieves the status of an import or export daemon.
990
991     """
992     return backend.GetImportExportStatus(params[0])
993
994   @staticmethod
995   def perspective_impexp_abort(params):
996     """Aborts an import or export.
997
998     """
999     return backend.AbortImportExport(params[0])
1000
1001   @staticmethod
1002   def perspective_impexp_cleanup(params):
1003     """Cleans up after an import or export.
1004
1005     """
1006     return backend.CleanupImportExport(params[0])
1007
1008
1009 def CheckNoded(_, args):
1010   """Initial checks whether to run or exit with a failure.
1011
1012   """
1013   if args: # noded doesn't take any arguments
1014     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1015                           sys.argv[0])
1016     sys.exit(constants.EXIT_FAILURE)
1017   try:
1018     codecs.lookup("string-escape")
1019   except LookupError:
1020     print >> sys.stderr, ("Can't load the string-escape code which is part"
1021                           " of the Python installation. Is your installation"
1022                           " complete/correct? Aborting.")
1023     sys.exit(constants.EXIT_FAILURE)
1024
1025
1026 def PrepNoded(options, _):
1027   """Preparation node daemon function, executed with the PID file held.
1028
1029   """
1030   if options.mlock:
1031     request_executor_class = MlockallRequestExecutor
1032     try:
1033       utils.Mlockall()
1034     except errors.NoCtypesError:
1035       logging.warning("Cannot set memory lock, ctypes module not found")
1036       request_executor_class = http.server.HttpServerRequestExecutor
1037   else:
1038     request_executor_class = http.server.HttpServerRequestExecutor
1039
1040   # Read SSL certificate
1041   if options.ssl:
1042     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1043                                     ssl_cert_path=options.ssl_cert)
1044   else:
1045     ssl_params = None
1046
1047   err = _PrepareQueueLock()
1048   if err is not None:
1049     # this might be some kind of file-system/permission error; while
1050     # this breaks the job queue functionality, we shouldn't prevent
1051     # startup of the whole node daemon because of this
1052     logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1053
1054   mainloop = daemon.Mainloop()
1055   server = NodeHttpServer(mainloop, options.bind_address, options.port,
1056                           ssl_params=ssl_params, ssl_verify_peer=True,
1057                           request_executor_class=request_executor_class)
1058   server.Start()
1059   return (mainloop, server)
1060
1061
1062 def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1063   """Main node daemon function, executed with the PID file held.
1064
1065   """
1066   (mainloop, server) = prep_data
1067   try:
1068     mainloop.Run()
1069   finally:
1070     server.Stop()
1071
1072
1073 def Main():
1074   """Main function for the node daemon.
1075
1076   """
1077   parser = OptionParser(description="Ganeti node daemon",
1078                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
1079                         version="%%prog (ganeti) %s" %
1080                         constants.RELEASE_VERSION)
1081   parser.add_option("--no-mlock", dest="mlock",
1082                     help="Do not mlock the node memory in ram",
1083                     default=True, action="store_false")
1084
1085   daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1086                      default_ssl_cert=constants.NODED_CERT_FILE,
1087                      default_ssl_key=constants.NODED_CERT_FILE,
1088                      console_logging=True)