Add cluster name to instance migration RPC
[ganeti-local] / lib / server / noded.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36 import codecs
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti.storage import container
49 from ganeti import serializer
50 from ganeti import netutils
51 from ganeti import pathutils
52 from ganeti import ssconf
53
54 import ganeti.http.server # pylint: disable=W0611
55
56
57 queue_lock = None
58
59
60 def _extendReasonTrail(trail, source, reason=""):
61   """Extend the reason trail with noded information
62
63   The trail is extended by appending the name of the noded functionality
64   """
65   assert trail is not None
66   trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source)
67   trail.append((trail_source, reason, utils.EpochNano()))
68
69
70 def _PrepareQueueLock():
71   """Try to prepare the queue lock.
72
73   @return: None for success, otherwise an exception object
74
75   """
76   global queue_lock # pylint: disable=W0603
77
78   if queue_lock is not None:
79     return None
80
81   # Prepare job queue
82   try:
83     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
84     return None
85   except EnvironmentError, err:
86     return err
87
88
89 def _RequireJobQueueLock(fn):
90   """Decorator for job queue manipulating functions.
91
92   """
93   QUEUE_LOCK_TIMEOUT = 10
94
95   def wrapper(*args, **kwargs):
96     # Locking in exclusive, blocking mode because there could be several
97     # children running at the same time. Waiting up to 10 seconds.
98     if _PrepareQueueLock() is not None:
99       raise errors.JobQueueError("Job queue failed initialization,"
100                                  " cannot update jobs")
101     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
102     try:
103       return fn(*args, **kwargs)
104     finally:
105       queue_lock.Unlock()
106
107   return wrapper
108
109
110 def _DecodeImportExportIO(ieio, ieioargs):
111   """Decodes import/export I/O information.
112
113   """
114   if ieio == constants.IEIO_RAW_DISK:
115     assert len(ieioargs) == 1
116     return (objects.Disk.FromDict(ieioargs[0]), )
117
118   if ieio == constants.IEIO_SCRIPT:
119     assert len(ieioargs) == 2
120     return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
121
122   return ieioargs
123
124
125 def _DefaultAlternative(value, default):
126   """Returns value or, if evaluating to False, a default value.
127
128   Returns the given value, unless it evaluates to False. In the latter case the
129   default value is returned.
130
131   @param value: Value to return if it doesn't evaluate to False
132   @param default: Default value
133   @return: Given value or the default
134
135   """
136   if value:
137     return value
138
139   return default
140
141
142 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
143   """Subclass ensuring request handlers are locked in RAM.
144
145   """
146   def __init__(self, *args, **kwargs):
147     utils.Mlockall()
148
149     http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
150
151
152 class NodeRequestHandler(http.server.HttpServerHandler):
153   """The server implementation.
154
155   This class holds all methods exposed over the RPC interface.
156
157   """
158   # too many public methods, and unused args - all methods get params
159   # due to the API
160   # pylint: disable=R0904,W0613
161   def __init__(self):
162     http.server.HttpServerHandler.__init__(self)
163     self.noded_pid = os.getpid()
164
165   def HandleRequest(self, req):
166     """Handle a request.
167
168     """
169     if req.request_method.upper() != http.HTTP_POST:
170       raise http.HttpBadRequest("Only the POST method is supported")
171
172     path = req.request_path
173     if path.startswith("/"):
174       path = path[1:]
175
176     method = getattr(self, "perspective_%s" % path, None)
177     if method is None:
178       raise http.HttpNotFound()
179
180     try:
181       result = (True, method(serializer.LoadJson(req.request_body)))
182
183     except backend.RPCFail, err:
184       # our custom failure exception; str(err) works fine if the
185       # exception was constructed with a single argument, and in
186       # this case, err.message == err.args[0] == str(err)
187       result = (False, str(err))
188     except errors.QuitGanetiException, err:
189       # Tell parent to quit
190       logging.info("Shutting down the node daemon, arguments: %s",
191                    str(err.args))
192       os.kill(self.noded_pid, signal.SIGTERM)
193       # And return the error's arguments, which must be already in
194       # correct tuple format
195       result = err.args
196     except Exception, err:
197       logging.exception("Error in RPC call")
198       result = (False, "Error while executing backend function: %s" % str(err))
199
200     return serializer.DumpJson(result)
201
202   # the new block devices  --------------------------
203
204   @staticmethod
205   def perspective_blockdev_create(params):
206     """Create a block device.
207
208     """
209     (bdev_s, size, owner, on_primary, info, excl_stor) = params
210     bdev = objects.Disk.FromDict(bdev_s)
211     if bdev is None:
212       raise ValueError("can't unserialize data!")
213     return backend.BlockdevCreate(bdev, size, owner, on_primary, info,
214                                   excl_stor)
215
216   @staticmethod
217   def perspective_blockdev_pause_resume_sync(params):
218     """Pause/resume sync of a block device.
219
220     """
221     disks_s, pause = params
222     disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
223     return backend.BlockdevPauseResumeSync(disks, pause)
224
225   @staticmethod
226   def perspective_blockdev_wipe(params):
227     """Wipe a block device.
228
229     """
230     bdev_s, offset, size = params
231     bdev = objects.Disk.FromDict(bdev_s)
232     return backend.BlockdevWipe(bdev, offset, size)
233
234   @staticmethod
235   def perspective_blockdev_remove(params):
236     """Remove a block device.
237
238     """
239     bdev_s = params[0]
240     bdev = objects.Disk.FromDict(bdev_s)
241     return backend.BlockdevRemove(bdev)
242
243   @staticmethod
244   def perspective_blockdev_rename(params):
245     """Remove a block device.
246
247     """
248     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]]
249     return backend.BlockdevRename(devlist)
250
251   @staticmethod
252   def perspective_blockdev_assemble(params):
253     """Assemble a block device.
254
255     """
256     bdev_s, owner, on_primary, idx = params
257     bdev = objects.Disk.FromDict(bdev_s)
258     if bdev is None:
259       raise ValueError("can't unserialize data!")
260     return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
261
262   @staticmethod
263   def perspective_blockdev_shutdown(params):
264     """Shutdown a block device.
265
266     """
267     bdev_s = params[0]
268     bdev = objects.Disk.FromDict(bdev_s)
269     if bdev is None:
270       raise ValueError("can't unserialize data!")
271     return backend.BlockdevShutdown(bdev)
272
273   @staticmethod
274   def perspective_blockdev_addchildren(params):
275     """Add a child to a mirror device.
276
277     Note: this is only valid for mirror devices. It's the caller's duty
278     to send a correct disk, otherwise we raise an error.
279
280     """
281     bdev_s, ndev_s = params
282     bdev = objects.Disk.FromDict(bdev_s)
283     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
284     if bdev is None or ndevs.count(None) > 0:
285       raise ValueError("can't unserialize data!")
286     return backend.BlockdevAddchildren(bdev, ndevs)
287
288   @staticmethod
289   def perspective_blockdev_removechildren(params):
290     """Remove a child from a mirror device.
291
292     This is only valid for mirror devices, of course. It's the callers
293     duty to send a correct disk, otherwise we raise an error.
294
295     """
296     bdev_s, ndev_s = params
297     bdev = objects.Disk.FromDict(bdev_s)
298     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
299     if bdev is None or ndevs.count(None) > 0:
300       raise ValueError("can't unserialize data!")
301     return backend.BlockdevRemovechildren(bdev, ndevs)
302
303   @staticmethod
304   def perspective_blockdev_getmirrorstatus(params):
305     """Return the mirror status for a list of disks.
306
307     """
308     disks = [objects.Disk.FromDict(dsk_s)
309              for dsk_s in params[0]]
310     return [status.ToDict()
311             for status in backend.BlockdevGetmirrorstatus(disks)]
312
313   @staticmethod
314   def perspective_blockdev_getmirrorstatus_multi(params):
315     """Return the mirror status for a list of disks.
316
317     """
318     (node_disks, ) = params
319
320     disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
321
322     result = []
323
324     for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
325       if success:
326         result.append((success, status.ToDict()))
327       else:
328         result.append((success, status))
329
330     return result
331
332   @staticmethod
333   def perspective_blockdev_find(params):
334     """Expose the FindBlockDevice functionality for a disk.
335
336     This will try to find but not activate a disk.
337
338     """
339     disk = objects.Disk.FromDict(params[0])
340
341     result = backend.BlockdevFind(disk)
342     if result is None:
343       return None
344
345     return result.ToDict()
346
347   @staticmethod
348   def perspective_blockdev_snapshot(params):
349     """Create a snapshot device.
350
351     Note that this is only valid for LVM disks, if we get passed
352     something else we raise an exception. The snapshot device can be
353     remove by calling the generic block device remove call.
354
355     """
356     cfbd = objects.Disk.FromDict(params[0])
357     return backend.BlockdevSnapshot(cfbd)
358
359   @staticmethod
360   def perspective_blockdev_grow(params):
361     """Grow a stack of devices.
362
363     """
364     if len(params) < 4:
365       raise ValueError("Received only 3 parameters in blockdev_grow,"
366                        " old master?")
367     cfbd = objects.Disk.FromDict(params[0])
368     amount = params[1]
369     dryrun = params[2]
370     backingstore = params[3]
371     return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore)
372
373   @staticmethod
374   def perspective_blockdev_close(params):
375     """Closes the given block devices.
376
377     """
378     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
379     return backend.BlockdevClose(params[0], disks)
380
381   @staticmethod
382   def perspective_blockdev_getdimensions(params):
383     """Compute the sizes of the given block devices.
384
385     """
386     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
387     return backend.BlockdevGetdimensions(disks)
388
389   @staticmethod
390   def perspective_blockdev_export(params):
391     """Compute the sizes of the given block devices.
392
393     """
394     disk = objects.Disk.FromDict(params[0])
395     dest_node, dest_path, cluster_name = params[1:]
396     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
397
398   @staticmethod
399   def perspective_blockdev_setinfo(params):
400     """Sets metadata information on the given block device.
401
402     """
403     (disk, info) = params
404     disk = objects.Disk.FromDict(disk)
405     return backend.BlockdevSetInfo(disk, info)
406
407   # blockdev/drbd specific methods ----------
408
409   @staticmethod
410   def perspective_drbd_disconnect_net(params):
411     """Disconnects the network connection of drbd disks.
412
413     Note that this is only valid for drbd disks, so the members of the
414     disk list must all be drbd devices.
415
416     """
417     nodes_ip, disks = params
418     disks = [objects.Disk.FromDict(cf) for cf in disks]
419     return backend.DrbdDisconnectNet(nodes_ip, disks)
420
421   @staticmethod
422   def perspective_drbd_attach_net(params):
423     """Attaches the network connection of drbd disks.
424
425     Note that this is only valid for drbd disks, so the members of the
426     disk list must all be drbd devices.
427
428     """
429     nodes_ip, disks, instance_name, multimaster = params
430     disks = [objects.Disk.FromDict(cf) for cf in disks]
431     return backend.DrbdAttachNet(nodes_ip, disks,
432                                      instance_name, multimaster)
433
434   @staticmethod
435   def perspective_drbd_wait_sync(params):
436     """Wait until DRBD disks are synched.
437
438     Note that this is only valid for drbd disks, so the members of the
439     disk list must all be drbd devices.
440
441     """
442     nodes_ip, disks = params
443     disks = [objects.Disk.FromDict(cf) for cf in disks]
444     return backend.DrbdWaitSync(nodes_ip, disks)
445
446   @staticmethod
447   def perspective_drbd_helper(params):
448     """Query drbd helper.
449
450     """
451     return backend.GetDrbdUsermodeHelper()
452
453   # export/import  --------------------------
454
455   @staticmethod
456   def perspective_finalize_export(params):
457     """Expose the finalize export functionality.
458
459     """
460     instance = objects.Instance.FromDict(params[0])
461
462     snap_disks = []
463     for disk in params[1]:
464       if isinstance(disk, bool):
465         snap_disks.append(disk)
466       else:
467         snap_disks.append(objects.Disk.FromDict(disk))
468
469     return backend.FinalizeExport(instance, snap_disks)
470
471   @staticmethod
472   def perspective_export_info(params):
473     """Query information about an existing export on this node.
474
475     The given path may not contain an export, in which case we return
476     None.
477
478     """
479     path = params[0]
480     return backend.ExportInfo(path)
481
482   @staticmethod
483   def perspective_export_list(params):
484     """List the available exports on this node.
485
486     Note that as opposed to export_info, which may query data about an
487     export in any path, this only queries the standard Ganeti path
488     (pathutils.EXPORT_DIR).
489
490     """
491     return backend.ListExports()
492
493   @staticmethod
494   def perspective_export_remove(params):
495     """Remove an export.
496
497     """
498     export = params[0]
499     return backend.RemoveExport(export)
500
501   # block device ---------------------
502   @staticmethod
503   def perspective_bdev_sizes(params):
504     """Query the list of block devices
505
506     """
507     devices = params[0]
508     return backend.GetBlockDevSizes(devices)
509
510   # volume  --------------------------
511
512   @staticmethod
513   def perspective_lv_list(params):
514     """Query the list of logical volumes in a given volume group.
515
516     """
517     vgname = params[0]
518     return backend.GetVolumeList(vgname)
519
520   @staticmethod
521   def perspective_vg_list(params):
522     """Query the list of volume groups.
523
524     """
525     return backend.ListVolumeGroups()
526
527   # Storage --------------------------
528
529   @staticmethod
530   def perspective_storage_list(params):
531     """Get list of storage units.
532
533     """
534     (su_name, su_args, name, fields) = params
535     return container.GetStorage(su_name, *su_args).List(name, fields)
536
537   @staticmethod
538   def perspective_storage_modify(params):
539     """Modify a storage unit.
540
541     """
542     (su_name, su_args, name, changes) = params
543     return container.GetStorage(su_name, *su_args).Modify(name, changes)
544
545   @staticmethod
546   def perspective_storage_execute(params):
547     """Execute an operation on a storage unit.
548
549     """
550     (su_name, su_args, name, op) = params
551     return container.GetStorage(su_name, *su_args).Execute(name, op)
552
553   # bridge  --------------------------
554
555   @staticmethod
556   def perspective_bridges_exist(params):
557     """Check if all bridges given exist on this node.
558
559     """
560     bridges_list = params[0]
561     return backend.BridgesExist(bridges_list)
562
563   # instance  --------------------------
564
565   @staticmethod
566   def perspective_instance_os_add(params):
567     """Install an OS on a given instance.
568
569     """
570     inst_s = params[0]
571     inst = objects.Instance.FromDict(inst_s)
572     reinstall = params[1]
573     debug = params[2]
574     return backend.InstanceOsAdd(inst, reinstall, debug)
575
576   @staticmethod
577   def perspective_instance_run_rename(params):
578     """Runs the OS rename script for an instance.
579
580     """
581     inst_s, old_name, debug = params
582     inst = objects.Instance.FromDict(inst_s)
583     return backend.RunRenameInstance(inst, old_name, debug)
584
585   @staticmethod
586   def perspective_instance_shutdown(params):
587     """Shutdown an instance.
588
589     """
590     instance = objects.Instance.FromDict(params[0])
591     timeout = params[1]
592     trail = params[2]
593     _extendReasonTrail(trail, "shutdown")
594     return backend.InstanceShutdown(instance, timeout, trail)
595
596   @staticmethod
597   def perspective_instance_start(params):
598     """Start an instance.
599
600     """
601     (instance_name, startup_paused, trail) = params
602     instance = objects.Instance.FromDict(instance_name)
603     _extendReasonTrail(trail, "start")
604     return backend.StartInstance(instance, startup_paused, trail)
605
606   @staticmethod
607   def perspective_migration_info(params):
608     """Gather information about an instance to be migrated.
609
610     """
611     instance = objects.Instance.FromDict(params[0])
612     return backend.MigrationInfo(instance)
613
614   @staticmethod
615   def perspective_accept_instance(params):
616     """Prepare the node to accept an instance.
617
618     """
619     instance, info, target = params
620     instance = objects.Instance.FromDict(instance)
621     return backend.AcceptInstance(instance, info, target)
622
623   @staticmethod
624   def perspective_instance_finalize_migration_dst(params):
625     """Finalize the instance migration on the destination node.
626
627     """
628     instance, info, success = params
629     instance = objects.Instance.FromDict(instance)
630     return backend.FinalizeMigrationDst(instance, info, success)
631
632   @staticmethod
633   def perspective_instance_migrate(params):
634     """Migrates an instance.
635
636     """
637     cluster_name, instance, target, live = params
638     instance = objects.Instance.FromDict(instance)
639     return backend.MigrateInstance(cluster_name, instance, target, live)
640
641   @staticmethod
642   def perspective_instance_finalize_migration_src(params):
643     """Finalize the instance migration on the source node.
644
645     """
646     instance, success, live = params
647     instance = objects.Instance.FromDict(instance)
648     return backend.FinalizeMigrationSource(instance, success, live)
649
650   @staticmethod
651   def perspective_instance_get_migration_status(params):
652     """Reports migration status.
653
654     """
655     instance = objects.Instance.FromDict(params[0])
656     return backend.GetMigrationStatus(instance).ToDict()
657
658   @staticmethod
659   def perspective_instance_reboot(params):
660     """Reboot an instance.
661
662     """
663     instance = objects.Instance.FromDict(params[0])
664     reboot_type = params[1]
665     shutdown_timeout = params[2]
666     trail = params[3]
667     _extendReasonTrail(trail, "reboot")
668     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout,
669                                   trail)
670
671   @staticmethod
672   def perspective_instance_balloon_memory(params):
673     """Modify instance runtime memory.
674
675     """
676     instance_dict, memory = params
677     instance = objects.Instance.FromDict(instance_dict)
678     return backend.InstanceBalloonMemory(instance, memory)
679
680   @staticmethod
681   def perspective_instance_info(params):
682     """Query instance information.
683
684     """
685     (instance_name, hypervisor_name, hvparams) = params
686     return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
687
688   @staticmethod
689   def perspective_instance_migratable(params):
690     """Query whether the specified instance can be migrated.
691
692     """
693     instance = objects.Instance.FromDict(params[0])
694     return backend.GetInstanceMigratable(instance)
695
696   @staticmethod
697   def perspective_all_instances_info(params):
698     """Query information about all instances.
699
700     """
701     (hypervisor_list, all_hvparams) = params
702     return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
703
704   @staticmethod
705   def perspective_instance_list(params):
706     """Query the list of running instances.
707
708     """
709     (hypervisor_list, hvparams) = params
710     return backend.GetInstanceList(hypervisor_list, hvparams)
711
712   # node --------------------------
713
714   @staticmethod
715   def perspective_node_has_ip_address(params):
716     """Checks if a node has the given ip address.
717
718     """
719     return netutils.IPAddress.Own(params[0])
720
721   @staticmethod
722   def perspective_node_info(params):
723     """Query node information.
724
725     """
726     (storage_units, hv_specs, excl_stor) = params
727     return backend.GetNodeInfo(storage_units, hv_specs, excl_stor)
728
729   @staticmethod
730   def perspective_etc_hosts_modify(params):
731     """Modify a node entry in /etc/hosts.
732
733     """
734     backend.EtcHostsModify(params[0], params[1], params[2])
735
736     return True
737
738   @staticmethod
739   def perspective_node_verify(params):
740     """Run a verify sequence on this node.
741
742     """
743     (what, cluster_name, hvparams) = params
744     return backend.VerifyNode(what, cluster_name, hvparams)
745
746   @classmethod
747   def perspective_node_verify_light(cls, params):
748     """Run a light verify sequence on this node.
749
750     """
751     # So far it's the same as the normal node_verify
752     return cls.perspective_node_verify(params)
753
754   @staticmethod
755   def perspective_node_start_master_daemons(params):
756     """Start the master daemons on this node.
757
758     """
759     return backend.StartMasterDaemons(params[0])
760
761   @staticmethod
762   def perspective_node_activate_master_ip(params):
763     """Activate the master IP on this node.
764
765     """
766     master_params = objects.MasterNetworkParameters.FromDict(params[0])
767     return backend.ActivateMasterIp(master_params, params[1])
768
769   @staticmethod
770   def perspective_node_deactivate_master_ip(params):
771     """Deactivate the master IP on this node.
772
773     """
774     master_params = objects.MasterNetworkParameters.FromDict(params[0])
775     return backend.DeactivateMasterIp(master_params, params[1])
776
777   @staticmethod
778   def perspective_node_stop_master(params):
779     """Stops master daemons on this node.
780
781     """
782     return backend.StopMasterDaemons()
783
784   @staticmethod
785   def perspective_node_change_master_netmask(params):
786     """Change the master IP netmask.
787
788     """
789     return backend.ChangeMasterNetmask(params[0], params[1], params[2],
790                                        params[3])
791
792   @staticmethod
793   def perspective_node_leave_cluster(params):
794     """Cleanup after leaving a cluster.
795
796     """
797     return backend.LeaveCluster(params[0])
798
799   @staticmethod
800   def perspective_node_volumes(params):
801     """Query the list of all logical volume groups.
802
803     """
804     return backend.NodeVolumes()
805
806   @staticmethod
807   def perspective_node_demote_from_mc(params):
808     """Demote a node from the master candidate role.
809
810     """
811     return backend.DemoteFromMC()
812
813   @staticmethod
814   def perspective_node_powercycle(params):
815     """Tries to powercycle the nod.
816
817     """
818     (hypervisor_type, hvparams) = params
819     return backend.PowercycleNode(hypervisor_type, hvparams)
820
821   # cluster --------------------------
822
823   @staticmethod
824   def perspective_version(params):
825     """Query version information.
826
827     """
828     return constants.PROTOCOL_VERSION
829
830   @staticmethod
831   def perspective_upload_file(params):
832     """Upload a file.
833
834     Note that the backend implementation imposes strict rules on which
835     files are accepted.
836
837     """
838     return backend.UploadFile(*(params[0]))
839
840   @staticmethod
841   def perspective_master_info(params):
842     """Query master information.
843
844     """
845     return backend.GetMasterInfo()
846
847   @staticmethod
848   def perspective_run_oob(params):
849     """Runs oob on node.
850
851     """
852     output = backend.RunOob(params[0], params[1], params[2], params[3])
853     if output:
854       result = serializer.LoadJson(output)
855     else:
856       result = None
857     return result
858
859   @staticmethod
860   def perspective_restricted_command(params):
861     """Runs a restricted command.
862
863     """
864     (cmd, ) = params
865
866     return backend.RunRestrictedCmd(cmd)
867
868   @staticmethod
869   def perspective_write_ssconf_files(params):
870     """Write ssconf files.
871
872     """
873     (values,) = params
874     return ssconf.WriteSsconfFiles(values)
875
876   @staticmethod
877   def perspective_get_watcher_pause(params):
878     """Get watcher pause end.
879
880     """
881     return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
882
883   @staticmethod
884   def perspective_set_watcher_pause(params):
885     """Set watcher pause.
886
887     """
888     (until, ) = params
889     return backend.SetWatcherPause(until)
890
891   # os -----------------------
892
893   @staticmethod
894   def perspective_os_diagnose(params):
895     """Query detailed information about existing OSes.
896
897     """
898     return backend.DiagnoseOS()
899
900   @staticmethod
901   def perspective_os_get(params):
902     """Query information about a given OS.
903
904     """
905     name = params[0]
906     os_obj = backend.OSFromDisk(name)
907     return os_obj.ToDict()
908
909   @staticmethod
910   def perspective_os_validate(params):
911     """Run a given OS' validation routine.
912
913     """
914     required, name, checks, params = params
915     return backend.ValidateOS(required, name, checks, params)
916
917   # extstorage -----------------------
918
919   @staticmethod
920   def perspective_extstorage_diagnose(params):
921     """Query detailed information about existing extstorage providers.
922
923     """
924     return backend.DiagnoseExtStorage()
925
926   # hooks -----------------------
927
928   @staticmethod
929   def perspective_hooks_runner(params):
930     """Run hook scripts.
931
932     """
933     hpath, phase, env = params
934     hr = backend.HooksRunner()
935     return hr.RunHooks(hpath, phase, env)
936
937   # iallocator -----------------
938
939   @staticmethod
940   def perspective_iallocator_runner(params):
941     """Run an iallocator script.
942
943     """
944     name, idata = params
945     iar = backend.IAllocatorRunner()
946     return iar.Run(name, idata)
947
948   # test -----------------------
949
950   @staticmethod
951   def perspective_test_delay(params):
952     """Run test delay.
953
954     """
955     duration = params[0]
956     status, rval = utils.TestDelay(duration)
957     if not status:
958       raise backend.RPCFail(rval)
959     return rval
960
961   # file storage ---------------
962
963   @staticmethod
964   def perspective_file_storage_dir_create(params):
965     """Create the file storage directory.
966
967     """
968     file_storage_dir = params[0]
969     return backend.CreateFileStorageDir(file_storage_dir)
970
971   @staticmethod
972   def perspective_file_storage_dir_remove(params):
973     """Remove the file storage directory.
974
975     """
976     file_storage_dir = params[0]
977     return backend.RemoveFileStorageDir(file_storage_dir)
978
979   @staticmethod
980   def perspective_file_storage_dir_rename(params):
981     """Rename the file storage directory.
982
983     """
984     old_file_storage_dir = params[0]
985     new_file_storage_dir = params[1]
986     return backend.RenameFileStorageDir(old_file_storage_dir,
987                                         new_file_storage_dir)
988
989   # jobs ------------------------
990
991   @staticmethod
992   @_RequireJobQueueLock
993   def perspective_jobqueue_update(params):
994     """Update job queue.
995
996     """
997     (file_name, content) = params
998     return backend.JobQueueUpdate(file_name, content)
999
1000   @staticmethod
1001   @_RequireJobQueueLock
1002   def perspective_jobqueue_purge(params):
1003     """Purge job queue.
1004
1005     """
1006     return backend.JobQueuePurge()
1007
1008   @staticmethod
1009   @_RequireJobQueueLock
1010   def perspective_jobqueue_rename(params):
1011     """Rename a job queue file.
1012
1013     """
1014     # TODO: What if a file fails to rename?
1015     return [backend.JobQueueRename(old, new) for old, new in params[0]]
1016
1017   @staticmethod
1018   @_RequireJobQueueLock
1019   def perspective_jobqueue_set_drain_flag(params):
1020     """Set job queue's drain flag.
1021
1022     """
1023     (flag, ) = params
1024
1025     return jstore.SetDrainFlag(flag)
1026
1027   # hypervisor ---------------
1028
1029   @staticmethod
1030   def perspective_hypervisor_validate_params(params):
1031     """Validate the hypervisor parameters.
1032
1033     """
1034     (hvname, hvparams) = params
1035     return backend.ValidateHVParams(hvname, hvparams)
1036
1037   # Crypto
1038
1039   @staticmethod
1040   def perspective_x509_cert_create(params):
1041     """Creates a new X509 certificate for SSL/TLS.
1042
1043     """
1044     (validity, ) = params
1045     return backend.CreateX509Certificate(validity)
1046
1047   @staticmethod
1048   def perspective_x509_cert_remove(params):
1049     """Removes a X509 certificate.
1050
1051     """
1052     (name, ) = params
1053     return backend.RemoveX509Certificate(name)
1054
1055   # Import and export
1056
1057   @staticmethod
1058   def perspective_import_start(params):
1059     """Starts an import daemon.
1060
1061     """
1062     (opts_s, instance, component, (dest, dest_args)) = params
1063
1064     opts = objects.ImportExportOptions.FromDict(opts_s)
1065
1066     return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
1067                                            None, None,
1068                                            objects.Instance.FromDict(instance),
1069                                            component, dest,
1070                                            _DecodeImportExportIO(dest,
1071                                                                  dest_args))
1072
1073   @staticmethod
1074   def perspective_export_start(params):
1075     """Starts an export daemon.
1076
1077     """
1078     (opts_s, host, port, instance, component, (source, source_args)) = params
1079
1080     opts = objects.ImportExportOptions.FromDict(opts_s)
1081
1082     return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
1083                                            host, port,
1084                                            objects.Instance.FromDict(instance),
1085                                            component, source,
1086                                            _DecodeImportExportIO(source,
1087                                                                  source_args))
1088
1089   @staticmethod
1090   def perspective_impexp_status(params):
1091     """Retrieves the status of an import or export daemon.
1092
1093     """
1094     return backend.GetImportExportStatus(params[0])
1095
1096   @staticmethod
1097   def perspective_impexp_abort(params):
1098     """Aborts an import or export.
1099
1100     """
1101     return backend.AbortImportExport(params[0])
1102
1103   @staticmethod
1104   def perspective_impexp_cleanup(params):
1105     """Cleans up after an import or export.
1106
1107     """
1108     return backend.CleanupImportExport(params[0])
1109
1110
1111 def CheckNoded(_, args):
1112   """Initial checks whether to run or exit with a failure.
1113
1114   """
1115   if args: # noded doesn't take any arguments
1116     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1117                           sys.argv[0])
1118     sys.exit(constants.EXIT_FAILURE)
1119   try:
1120     codecs.lookup("string-escape")
1121   except LookupError:
1122     print >> sys.stderr, ("Can't load the string-escape code which is part"
1123                           " of the Python installation. Is your installation"
1124                           " complete/correct? Aborting.")
1125     sys.exit(constants.EXIT_FAILURE)
1126
1127
1128 def PrepNoded(options, _):
1129   """Preparation node daemon function, executed with the PID file held.
1130
1131   """
1132   if options.mlock:
1133     request_executor_class = MlockallRequestExecutor
1134     try:
1135       utils.Mlockall()
1136     except errors.NoCtypesError:
1137       logging.warning("Cannot set memory lock, ctypes module not found")
1138       request_executor_class = http.server.HttpServerRequestExecutor
1139   else:
1140     request_executor_class = http.server.HttpServerRequestExecutor
1141
1142   # Read SSL certificate
1143   if options.ssl:
1144     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1145                                     ssl_cert_path=options.ssl_cert)
1146   else:
1147     ssl_params = None
1148
1149   err = _PrepareQueueLock()
1150   if err is not None:
1151     # this might be some kind of file-system/permission error; while
1152     # this breaks the job queue functionality, we shouldn't prevent
1153     # startup of the whole node daemon because of this
1154     logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1155
1156   handler = NodeRequestHandler()
1157
1158   mainloop = daemon.Mainloop()
1159   server = \
1160     http.server.HttpServer(mainloop, options.bind_address, options.port,
1161                            handler, ssl_params=ssl_params, ssl_verify_peer=True,
1162                            request_executor_class=request_executor_class)
1163   server.Start()
1164
1165   return (mainloop, server)
1166
1167
1168 def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1169   """Main node daemon function, executed with the PID file held.
1170
1171   """
1172   (mainloop, server) = prep_data
1173   try:
1174     mainloop.Run()
1175   finally:
1176     server.Stop()
1177
1178
1179 def Main():
1180   """Main function for the node daemon.
1181
1182   """
1183   parser = OptionParser(description="Ganeti node daemon",
1184                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]\
1185                               \ [-i INTERFACE]",
1186                         version="%%prog (ganeti) %s" %
1187                         constants.RELEASE_VERSION)
1188   parser.add_option("--no-mlock", dest="mlock",
1189                     help="Do not mlock the node memory in ram",
1190                     default=True, action="store_false")
1191
1192   daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1193                      default_ssl_cert=pathutils.NODED_CERT_FILE,
1194                      default_ssl_key=pathutils.NODED_CERT_FILE,
1195                      console_logging=True)