hv_kvm: initial support for CPU pinning
[ganeti-local] / lib / server / noded.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable-msg=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36 import codecs
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti import storage
49 from ganeti import serializer
50 from ganeti import netutils
51
52 import ganeti.http.server # pylint: disable-msg=W0611
53
54
55 queue_lock = None
56
57
58 def _PrepareQueueLock():
59   """Try to prepare the queue lock.
60
61   @return: None for success, otherwise an exception object
62
63   """
64   global queue_lock # pylint: disable-msg=W0603
65
66   if queue_lock is not None:
67     return None
68
69   # Prepare job queue
70   try:
71     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
72     return None
73   except EnvironmentError, err:
74     return err
75
76
77 def _RequireJobQueueLock(fn):
78   """Decorator for job queue manipulating functions.
79
80   """
81   QUEUE_LOCK_TIMEOUT = 10
82
83   def wrapper(*args, **kwargs):
84     # Locking in exclusive, blocking mode because there could be several
85     # children running at the same time. Waiting up to 10 seconds.
86     if _PrepareQueueLock() is not None:
87       raise errors.JobQueueError("Job queue failed initialization,"
88                                  " cannot update jobs")
89     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
90     try:
91       return fn(*args, **kwargs)
92     finally:
93       queue_lock.Unlock()
94
95   return wrapper
96
97
98 def _DecodeImportExportIO(ieio, ieioargs):
99   """Decodes import/export I/O information.
100
101   """
102   if ieio == constants.IEIO_RAW_DISK:
103     assert len(ieioargs) == 1
104     return (objects.Disk.FromDict(ieioargs[0]), )
105
106   if ieio == constants.IEIO_SCRIPT:
107     assert len(ieioargs) == 2
108     return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
109
110   return ieioargs
111
112
113 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
114   """Custom Request Executor class that ensures NodeHttpServer children are
115   locked in ram.
116
117   """
118   def __init__(self, *args, **kwargs):
119     utils.Mlockall()
120
121     http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
122
123
124 class NodeHttpServer(http.server.HttpServer):
125   """The server implementation.
126
127   This class holds all methods exposed over the RPC interface.
128
129   """
130   # too many public methods, and unused args - all methods get params
131   # due to the API
132   # pylint: disable-msg=R0904,W0613
133   def __init__(self, *args, **kwargs):
134     http.server.HttpServer.__init__(self, *args, **kwargs)
135     self.noded_pid = os.getpid()
136
137   def HandleRequest(self, req):
138     """Handle a request.
139
140     """
141     if req.request_method.upper() != http.HTTP_PUT:
142       raise http.HttpBadRequest()
143
144     path = req.request_path
145     if path.startswith("/"):
146       path = path[1:]
147
148     method = getattr(self, "perspective_%s" % path, None)
149     if method is None:
150       raise http.HttpNotFound()
151
152     try:
153       result = (True, method(serializer.LoadJson(req.request_body)))
154
155     except backend.RPCFail, err:
156       # our custom failure exception; str(err) works fine if the
157       # exception was constructed with a single argument, and in
158       # this case, err.message == err.args[0] == str(err)
159       result = (False, str(err))
160     except errors.QuitGanetiException, err:
161       # Tell parent to quit
162       logging.info("Shutting down the node daemon, arguments: %s",
163                    str(err.args))
164       os.kill(self.noded_pid, signal.SIGTERM)
165       # And return the error's arguments, which must be already in
166       # correct tuple format
167       result = err.args
168     except Exception, err:
169       logging.exception("Error in RPC call")
170       result = (False, "Error while executing backend function: %s" % str(err))
171
172     return serializer.DumpJson(result, indent=False)
173
174   # the new block devices  --------------------------
175
176   @staticmethod
177   def perspective_blockdev_create(params):
178     """Create a block device.
179
180     """
181     bdev_s, size, owner, on_primary, info = params
182     bdev = objects.Disk.FromDict(bdev_s)
183     if bdev is None:
184       raise ValueError("can't unserialize data!")
185     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
186
187   @staticmethod
188   def perspective_blockdev_pause_resume_sync(params):
189     """Pause/resume sync of a block device.
190
191     """
192     disks_s, pause = params
193     disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
194     return backend.BlockdevPauseResumeSync(disks, pause)
195
196   @staticmethod
197   def perspective_blockdev_wipe(params):
198     """Wipe a block device.
199
200     """
201     bdev_s, offset, size = params
202     bdev = objects.Disk.FromDict(bdev_s)
203     return backend.BlockdevWipe(bdev, offset, size)
204
205   @staticmethod
206   def perspective_blockdev_remove(params):
207     """Remove a block device.
208
209     """
210     bdev_s = params[0]
211     bdev = objects.Disk.FromDict(bdev_s)
212     return backend.BlockdevRemove(bdev)
213
214   @staticmethod
215   def perspective_blockdev_rename(params):
216     """Remove a block device.
217
218     """
219     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
220     return backend.BlockdevRename(devlist)
221
222   @staticmethod
223   def perspective_blockdev_assemble(params):
224     """Assemble a block device.
225
226     """
227     bdev_s, owner, on_primary, idx = params
228     bdev = objects.Disk.FromDict(bdev_s)
229     if bdev is None:
230       raise ValueError("can't unserialize data!")
231     return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
232
233   @staticmethod
234   def perspective_blockdev_shutdown(params):
235     """Shutdown a block device.
236
237     """
238     bdev_s = params[0]
239     bdev = objects.Disk.FromDict(bdev_s)
240     if bdev is None:
241       raise ValueError("can't unserialize data!")
242     return backend.BlockdevShutdown(bdev)
243
244   @staticmethod
245   def perspective_blockdev_addchildren(params):
246     """Add a child to a mirror device.
247
248     Note: this is only valid for mirror devices. It's the caller's duty
249     to send a correct disk, otherwise we raise an error.
250
251     """
252     bdev_s, ndev_s = params
253     bdev = objects.Disk.FromDict(bdev_s)
254     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
255     if bdev is None or ndevs.count(None) > 0:
256       raise ValueError("can't unserialize data!")
257     return backend.BlockdevAddchildren(bdev, ndevs)
258
259   @staticmethod
260   def perspective_blockdev_removechildren(params):
261     """Remove a child from a mirror device.
262
263     This is only valid for mirror devices, of course. It's the callers
264     duty to send a correct disk, otherwise we raise an error.
265
266     """
267     bdev_s, ndev_s = params
268     bdev = objects.Disk.FromDict(bdev_s)
269     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
270     if bdev is None or ndevs.count(None) > 0:
271       raise ValueError("can't unserialize data!")
272     return backend.BlockdevRemovechildren(bdev, ndevs)
273
274   @staticmethod
275   def perspective_blockdev_getmirrorstatus(params):
276     """Return the mirror status for a list of disks.
277
278     """
279     disks = [objects.Disk.FromDict(dsk_s)
280              for dsk_s in params]
281     return [status.ToDict()
282             for status in backend.BlockdevGetmirrorstatus(disks)]
283
284   @staticmethod
285   def perspective_blockdev_getmirrorstatus_multi(params):
286     """Return the mirror status for a list of disks.
287
288     """
289     (node_disks, ) = params
290
291     node_name = netutils.Hostname.GetSysName()
292
293     disks = [objects.Disk.FromDict(dsk_s)
294              for dsk_s in node_disks.get(node_name, [])]
295
296     result = []
297
298     for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
299       if success:
300         result.append((success, status.ToDict()))
301       else:
302         result.append((success, status))
303
304     return result
305
306   @staticmethod
307   def perspective_blockdev_find(params):
308     """Expose the FindBlockDevice functionality for a disk.
309
310     This will try to find but not activate a disk.
311
312     """
313     disk = objects.Disk.FromDict(params[0])
314
315     result = backend.BlockdevFind(disk)
316     if result is None:
317       return None
318
319     return result.ToDict()
320
321   @staticmethod
322   def perspective_blockdev_snapshot(params):
323     """Create a snapshot device.
324
325     Note that this is only valid for LVM disks, if we get passed
326     something else we raise an exception. The snapshot device can be
327     remove by calling the generic block device remove call.
328
329     """
330     cfbd = objects.Disk.FromDict(params[0])
331     return backend.BlockdevSnapshot(cfbd)
332
333   @staticmethod
334   def perspective_blockdev_grow(params):
335     """Grow a stack of devices.
336
337     """
338     cfbd = objects.Disk.FromDict(params[0])
339     amount = params[1]
340     dryrun = params[2]
341     return backend.BlockdevGrow(cfbd, amount, dryrun)
342
343   @staticmethod
344   def perspective_blockdev_close(params):
345     """Closes the given block devices.
346
347     """
348     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
349     return backend.BlockdevClose(params[0], disks)
350
351   @staticmethod
352   def perspective_blockdev_getsize(params):
353     """Compute the sizes of the given block devices.
354
355     """
356     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
357     return backend.BlockdevGetsize(disks)
358
359   @staticmethod
360   def perspective_blockdev_export(params):
361     """Compute the sizes of the given block devices.
362
363     """
364     disk = objects.Disk.FromDict(params[0])
365     dest_node, dest_path, cluster_name = params[1:]
366     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
367
368   # blockdev/drbd specific methods ----------
369
370   @staticmethod
371   def perspective_drbd_disconnect_net(params):
372     """Disconnects the network connection of drbd disks.
373
374     Note that this is only valid for drbd disks, so the members of the
375     disk list must all be drbd devices.
376
377     """
378     nodes_ip, disks = params
379     disks = [objects.Disk.FromDict(cf) for cf in disks]
380     return backend.DrbdDisconnectNet(nodes_ip, disks)
381
382   @staticmethod
383   def perspective_drbd_attach_net(params):
384     """Attaches the network connection of drbd disks.
385
386     Note that this is only valid for drbd disks, so the members of the
387     disk list must all be drbd devices.
388
389     """
390     nodes_ip, disks, instance_name, multimaster = params
391     disks = [objects.Disk.FromDict(cf) for cf in disks]
392     return backend.DrbdAttachNet(nodes_ip, disks,
393                                      instance_name, multimaster)
394
395   @staticmethod
396   def perspective_drbd_wait_sync(params):
397     """Wait until DRBD disks are synched.
398
399     Note that this is only valid for drbd disks, so the members of the
400     disk list must all be drbd devices.
401
402     """
403     nodes_ip, disks = params
404     disks = [objects.Disk.FromDict(cf) for cf in disks]
405     return backend.DrbdWaitSync(nodes_ip, disks)
406
407   @staticmethod
408   def perspective_drbd_helper(params):
409     """Query drbd helper.
410
411     """
412     return backend.GetDrbdUsermodeHelper()
413
414   # export/import  --------------------------
415
416   @staticmethod
417   def perspective_finalize_export(params):
418     """Expose the finalize export functionality.
419
420     """
421     instance = objects.Instance.FromDict(params[0])
422
423     snap_disks = []
424     for disk in params[1]:
425       if isinstance(disk, bool):
426         snap_disks.append(disk)
427       else:
428         snap_disks.append(objects.Disk.FromDict(disk))
429
430     return backend.FinalizeExport(instance, snap_disks)
431
432   @staticmethod
433   def perspective_export_info(params):
434     """Query information about an existing export on this node.
435
436     The given path may not contain an export, in which case we return
437     None.
438
439     """
440     path = params[0]
441     return backend.ExportInfo(path)
442
443   @staticmethod
444   def perspective_export_list(params):
445     """List the available exports on this node.
446
447     Note that as opposed to export_info, which may query data about an
448     export in any path, this only queries the standard Ganeti path
449     (constants.EXPORT_DIR).
450
451     """
452     return backend.ListExports()
453
454   @staticmethod
455   def perspective_export_remove(params):
456     """Remove an export.
457
458     """
459     export = params[0]
460     return backend.RemoveExport(export)
461
462   # block device ---------------------
463   @staticmethod
464   def perspective_bdev_sizes(params):
465     """Query the list of block devices
466
467     """
468     devices = params[0]
469     return backend.GetBlockDevSizes(devices)
470
471   # volume  --------------------------
472
473   @staticmethod
474   def perspective_lv_list(params):
475     """Query the list of logical volumes in a given volume group.
476
477     """
478     vgname = params[0]
479     return backend.GetVolumeList(vgname)
480
481   @staticmethod
482   def perspective_vg_list(params):
483     """Query the list of volume groups.
484
485     """
486     return backend.ListVolumeGroups()
487
488   # Storage --------------------------
489
490   @staticmethod
491   def perspective_storage_list(params):
492     """Get list of storage units.
493
494     """
495     (su_name, su_args, name, fields) = params
496     return storage.GetStorage(su_name, *su_args).List(name, fields)
497
498   @staticmethod
499   def perspective_storage_modify(params):
500     """Modify a storage unit.
501
502     """
503     (su_name, su_args, name, changes) = params
504     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
505
506   @staticmethod
507   def perspective_storage_execute(params):
508     """Execute an operation on a storage unit.
509
510     """
511     (su_name, su_args, name, op) = params
512     return storage.GetStorage(su_name, *su_args).Execute(name, op)
513
514   # bridge  --------------------------
515
516   @staticmethod
517   def perspective_bridges_exist(params):
518     """Check if all bridges given exist on this node.
519
520     """
521     bridges_list = params[0]
522     return backend.BridgesExist(bridges_list)
523
524   # instance  --------------------------
525
526   @staticmethod
527   def perspective_instance_os_add(params):
528     """Install an OS on a given instance.
529
530     """
531     inst_s = params[0]
532     inst = objects.Instance.FromDict(inst_s)
533     reinstall = params[1]
534     debug = params[2]
535     return backend.InstanceOsAdd(inst, reinstall, debug)
536
537   @staticmethod
538   def perspective_instance_run_rename(params):
539     """Runs the OS rename script for an instance.
540
541     """
542     inst_s, old_name, debug = params
543     inst = objects.Instance.FromDict(inst_s)
544     return backend.RunRenameInstance(inst, old_name, debug)
545
546   @staticmethod
547   def perspective_instance_shutdown(params):
548     """Shutdown an instance.
549
550     """
551     instance = objects.Instance.FromDict(params[0])
552     timeout = params[1]
553     return backend.InstanceShutdown(instance, timeout)
554
555   @staticmethod
556   def perspective_instance_start(params):
557     """Start an instance.
558
559     """
560     (instance_name, startup_paused) = params
561     instance = objects.Instance.FromDict(instance_name)
562     return backend.StartInstance(instance, startup_paused)
563
564   @staticmethod
565   def perspective_migration_info(params):
566     """Gather information about an instance to be migrated.
567
568     """
569     instance = objects.Instance.FromDict(params[0])
570     return backend.MigrationInfo(instance)
571
572   @staticmethod
573   def perspective_accept_instance(params):
574     """Prepare the node to accept an instance.
575
576     """
577     instance, info, target = params
578     instance = objects.Instance.FromDict(instance)
579     return backend.AcceptInstance(instance, info, target)
580
581   @staticmethod
582   def perspective_finalize_migration(params):
583     """Finalize the instance migration.
584
585     """
586     instance, info, success = params
587     instance = objects.Instance.FromDict(instance)
588     return backend.FinalizeMigration(instance, info, success)
589
590   @staticmethod
591   def perspective_instance_migrate(params):
592     """Migrates an instance.
593
594     """
595     instance, target, live = params
596     instance = objects.Instance.FromDict(instance)
597     return backend.MigrateInstance(instance, target, live)
598
599   @staticmethod
600   def perspective_instance_reboot(params):
601     """Reboot an instance.
602
603     """
604     instance = objects.Instance.FromDict(params[0])
605     reboot_type = params[1]
606     shutdown_timeout = params[2]
607     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
608
609   @staticmethod
610   def perspective_instance_info(params):
611     """Query instance information.
612
613     """
614     return backend.GetInstanceInfo(params[0], params[1])
615
616   @staticmethod
617   def perspective_instance_migratable(params):
618     """Query whether the specified instance can be migrated.
619
620     """
621     instance = objects.Instance.FromDict(params[0])
622     return backend.GetInstanceMigratable(instance)
623
624   @staticmethod
625   def perspective_all_instances_info(params):
626     """Query information about all instances.
627
628     """
629     return backend.GetAllInstancesInfo(params[0])
630
631   @staticmethod
632   def perspective_instance_list(params):
633     """Query the list of running instances.
634
635     """
636     return backend.GetInstanceList(params[0])
637
638   # node --------------------------
639
640   @staticmethod
641   def perspective_node_tcp_ping(params):
642     """Do a TcpPing on the remote node.
643
644     """
645     return netutils.TcpPing(params[1], params[2], timeout=params[3],
646                             live_port_needed=params[4], source=params[0])
647
648   @staticmethod
649   def perspective_node_has_ip_address(params):
650     """Checks if a node has the given ip address.
651
652     """
653     return netutils.IPAddress.Own(params[0])
654
655   @staticmethod
656   def perspective_node_info(params):
657     """Query node information.
658
659     """
660     vgname, hypervisor_type = params
661     return backend.GetNodeInfo(vgname, hypervisor_type)
662
663   @staticmethod
664   def perspective_etc_hosts_modify(params):
665     """Modify a node entry in /etc/hosts.
666
667     """
668     backend.EtcHostsModify(params[0], params[1], params[2])
669
670     return True
671
672   @staticmethod
673   def perspective_node_verify(params):
674     """Run a verify sequence on this node.
675
676     """
677     return backend.VerifyNode(params[0], params[1])
678
679   @staticmethod
680   def perspective_node_start_master(params):
681     """Promote this node to master status.
682
683     """
684     return backend.StartMaster(params[0], params[1])
685
686   @staticmethod
687   def perspective_node_stop_master(params):
688     """Demote this node from master status.
689
690     """
691     return backend.StopMaster(params[0])
692
693   @staticmethod
694   def perspective_node_leave_cluster(params):
695     """Cleanup after leaving a cluster.
696
697     """
698     return backend.LeaveCluster(params[0])
699
700   @staticmethod
701   def perspective_node_volumes(params):
702     """Query the list of all logical volume groups.
703
704     """
705     return backend.NodeVolumes()
706
707   @staticmethod
708   def perspective_node_demote_from_mc(params):
709     """Demote a node from the master candidate role.
710
711     """
712     return backend.DemoteFromMC()
713
714
715   @staticmethod
716   def perspective_node_powercycle(params):
717     """Tries to powercycle the nod.
718
719     """
720     hypervisor_type = params[0]
721     return backend.PowercycleNode(hypervisor_type)
722
723
724   # cluster --------------------------
725
726   @staticmethod
727   def perspective_version(params):
728     """Query version information.
729
730     """
731     return constants.PROTOCOL_VERSION
732
733   @staticmethod
734   def perspective_upload_file(params):
735     """Upload a file.
736
737     Note that the backend implementation imposes strict rules on which
738     files are accepted.
739
740     """
741     return backend.UploadFile(*params)
742
743   @staticmethod
744   def perspective_master_info(params):
745     """Query master information.
746
747     """
748     return backend.GetMasterInfo()
749
750   @staticmethod
751   def perspective_run_oob(params):
752     """Runs oob on node.
753
754     """
755     output = backend.RunOob(params[0], params[1], params[2], params[3])
756     if output:
757       result = serializer.LoadJson(output)
758     else:
759       result = None
760     return result
761
762   @staticmethod
763   def perspective_write_ssconf_files(params):
764     """Write ssconf files.
765
766     """
767     (values,) = params
768     return backend.WriteSsconfFiles(values)
769
770   # os -----------------------
771
772   @staticmethod
773   def perspective_os_diagnose(params):
774     """Query detailed information about existing OSes.
775
776     """
777     return backend.DiagnoseOS()
778
779   @staticmethod
780   def perspective_os_get(params):
781     """Query information about a given OS.
782
783     """
784     name = params[0]
785     os_obj = backend.OSFromDisk(name)
786     return os_obj.ToDict()
787
788   @staticmethod
789   def perspective_os_validate(params):
790     """Run a given OS' validation routine.
791
792     """
793     required, name, checks, params = params
794     return backend.ValidateOS(required, name, checks, params)
795
796   # hooks -----------------------
797
798   @staticmethod
799   def perspective_hooks_runner(params):
800     """Run hook scripts.
801
802     """
803     hpath, phase, env = params
804     hr = backend.HooksRunner()
805     return hr.RunHooks(hpath, phase, env)
806
807   # iallocator -----------------
808
809   @staticmethod
810   def perspective_iallocator_runner(params):
811     """Run an iallocator script.
812
813     """
814     name, idata = params
815     iar = backend.IAllocatorRunner()
816     return iar.Run(name, idata)
817
818   # test -----------------------
819
820   @staticmethod
821   def perspective_test_delay(params):
822     """Run test delay.
823
824     """
825     duration = params[0]
826     status, rval = utils.TestDelay(duration)
827     if not status:
828       raise backend.RPCFail(rval)
829     return rval
830
831   # file storage ---------------
832
833   @staticmethod
834   def perspective_file_storage_dir_create(params):
835     """Create the file storage directory.
836
837     """
838     file_storage_dir = params[0]
839     return backend.CreateFileStorageDir(file_storage_dir)
840
841   @staticmethod
842   def perspective_file_storage_dir_remove(params):
843     """Remove the file storage directory.
844
845     """
846     file_storage_dir = params[0]
847     return backend.RemoveFileStorageDir(file_storage_dir)
848
849   @staticmethod
850   def perspective_file_storage_dir_rename(params):
851     """Rename the file storage directory.
852
853     """
854     old_file_storage_dir = params[0]
855     new_file_storage_dir = params[1]
856     return backend.RenameFileStorageDir(old_file_storage_dir,
857                                         new_file_storage_dir)
858
859   # jobs ------------------------
860
861   @staticmethod
862   @_RequireJobQueueLock
863   def perspective_jobqueue_update(params):
864     """Update job queue.
865
866     """
867     (file_name, content) = params
868     return backend.JobQueueUpdate(file_name, content)
869
870   @staticmethod
871   @_RequireJobQueueLock
872   def perspective_jobqueue_purge(params):
873     """Purge job queue.
874
875     """
876     return backend.JobQueuePurge()
877
878   @staticmethod
879   @_RequireJobQueueLock
880   def perspective_jobqueue_rename(params):
881     """Rename a job queue file.
882
883     """
884     # TODO: What if a file fails to rename?
885     return [backend.JobQueueRename(old, new) for old, new in params]
886
887   # hypervisor ---------------
888
889   @staticmethod
890   def perspective_hypervisor_validate_params(params):
891     """Validate the hypervisor parameters.
892
893     """
894     (hvname, hvparams) = params
895     return backend.ValidateHVParams(hvname, hvparams)
896
897   # Crypto
898
899   @staticmethod
900   def perspective_x509_cert_create(params):
901     """Creates a new X509 certificate for SSL/TLS.
902
903     """
904     (validity, ) = params
905     return backend.CreateX509Certificate(validity)
906
907   @staticmethod
908   def perspective_x509_cert_remove(params):
909     """Removes a X509 certificate.
910
911     """
912     (name, ) = params
913     return backend.RemoveX509Certificate(name)
914
915   # Import and export
916
917   @staticmethod
918   def perspective_import_start(params):
919     """Starts an import daemon.
920
921     """
922     (opts_s, instance, component, dest, dest_args) = params
923
924     opts = objects.ImportExportOptions.FromDict(opts_s)
925
926     return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
927                                            None, None,
928                                            objects.Instance.FromDict(instance),
929                                            component, dest,
930                                            _DecodeImportExportIO(dest,
931                                                                  dest_args))
932
933   @staticmethod
934   def perspective_export_start(params):
935     """Starts an export daemon.
936
937     """
938     (opts_s, host, port, instance, component, source, source_args) = params
939
940     opts = objects.ImportExportOptions.FromDict(opts_s)
941
942     return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
943                                            host, port,
944                                            objects.Instance.FromDict(instance),
945                                            component, source,
946                                            _DecodeImportExportIO(source,
947                                                                  source_args))
948
949   @staticmethod
950   def perspective_impexp_status(params):
951     """Retrieves the status of an import or export daemon.
952
953     """
954     return backend.GetImportExportStatus(params[0])
955
956   @staticmethod
957   def perspective_impexp_abort(params):
958     """Aborts an import or export.
959
960     """
961     return backend.AbortImportExport(params[0])
962
963   @staticmethod
964   def perspective_impexp_cleanup(params):
965     """Cleans up after an import or export.
966
967     """
968     return backend.CleanupImportExport(params[0])
969
970
971 def CheckNoded(_, args):
972   """Initial checks whether to run or exit with a failure.
973
974   """
975   if args: # noded doesn't take any arguments
976     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
977                           sys.argv[0])
978     sys.exit(constants.EXIT_FAILURE)
979   try:
980     codecs.lookup("string-escape")
981   except LookupError:
982     print >> sys.stderr, ("Can't load the string-escape code which is part"
983                           " of the Python installation. Is your installation"
984                           " complete/correct? Aborting.")
985     sys.exit(constants.EXIT_FAILURE)
986
987
988 def PrepNoded(options, _):
989   """Preparation node daemon function, executed with the PID file held.
990
991   """
992   if options.mlock:
993     request_executor_class = MlockallRequestExecutor
994     try:
995       utils.Mlockall()
996     except errors.NoCtypesError:
997       logging.warning("Cannot set memory lock, ctypes module not found")
998       request_executor_class = http.server.HttpServerRequestExecutor
999   else:
1000     request_executor_class = http.server.HttpServerRequestExecutor
1001
1002   # Read SSL certificate
1003   if options.ssl:
1004     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1005                                     ssl_cert_path=options.ssl_cert)
1006   else:
1007     ssl_params = None
1008
1009   err = _PrepareQueueLock()
1010   if err is not None:
1011     # this might be some kind of file-system/permission error; while
1012     # this breaks the job queue functionality, we shouldn't prevent
1013     # startup of the whole node daemon because of this
1014     logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1015
1016   mainloop = daemon.Mainloop()
1017   server = NodeHttpServer(mainloop, options.bind_address, options.port,
1018                           ssl_params=ssl_params, ssl_verify_peer=True,
1019                           request_executor_class=request_executor_class)
1020   server.Start()
1021   return (mainloop, server)
1022
1023
1024 def ExecNoded(options, args, prep_data): # pylint: disable-msg=W0613
1025   """Main node daemon function, executed with the PID file held.
1026
1027   """
1028   (mainloop, server) = prep_data
1029   try:
1030     mainloop.Run()
1031   finally:
1032     server.Stop()
1033
1034
1035 def Main():
1036   """Main function for the node daemon.
1037
1038   """
1039   parser = OptionParser(description="Ganeti node daemon",
1040                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
1041                         version="%%prog (ganeti) %s" %
1042                         constants.RELEASE_VERSION)
1043   parser.add_option("--no-mlock", dest="mlock",
1044                     help="Do not mlock the node memory in ram",
1045                     default=True, action="store_false")
1046
1047   daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1048                      default_ssl_cert=constants.NODED_CERT_FILE,
1049                      default_ssl_key=constants.NODED_CERT_FILE,
1050                      console_logging=True)