Reason trail implementation for "start"
[ganeti-local] / lib / server / noded.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36 import codecs
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti import storage
49 from ganeti import serializer
50 from ganeti import netutils
51 from ganeti import pathutils
52 from ganeti import ssconf
53
54 import ganeti.http.server # pylint: disable=W0611
55
56
57 queue_lock = None
58
59
60 def _extendReasonTrail(trail, source, reason=""):
61   """Extend the reason trail with noded information
62
63   The trail is extended by appending the name of the noded functionality
64   """
65   assert trail is not None
66   trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source)
67   trail.append((trail_source, reason, utils.EpochNano()))
68
69
70 def _PrepareQueueLock():
71   """Try to prepare the queue lock.
72
73   @return: None for success, otherwise an exception object
74
75   """
76   global queue_lock # pylint: disable=W0603
77
78   if queue_lock is not None:
79     return None
80
81   # Prepare job queue
82   try:
83     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
84     return None
85   except EnvironmentError, err:
86     return err
87
88
89 def _RequireJobQueueLock(fn):
90   """Decorator for job queue manipulating functions.
91
92   """
93   QUEUE_LOCK_TIMEOUT = 10
94
95   def wrapper(*args, **kwargs):
96     # Locking in exclusive, blocking mode because there could be several
97     # children running at the same time. Waiting up to 10 seconds.
98     if _PrepareQueueLock() is not None:
99       raise errors.JobQueueError("Job queue failed initialization,"
100                                  " cannot update jobs")
101     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
102     try:
103       return fn(*args, **kwargs)
104     finally:
105       queue_lock.Unlock()
106
107   return wrapper
108
109
110 def _DecodeImportExportIO(ieio, ieioargs):
111   """Decodes import/export I/O information.
112
113   """
114   if ieio == constants.IEIO_RAW_DISK:
115     assert len(ieioargs) == 1
116     return (objects.Disk.FromDict(ieioargs[0]), )
117
118   if ieio == constants.IEIO_SCRIPT:
119     assert len(ieioargs) == 2
120     return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
121
122   return ieioargs
123
124
125 def _DefaultAlternative(value, default):
126   """Returns value or, if evaluating to False, a default value.
127
128   Returns the given value, unless it evaluates to False. In the latter case the
129   default value is returned.
130
131   @param value: Value to return if it doesn't evaluate to False
132   @param default: Default value
133   @return: Given value or the default
134
135   """
136   if value:
137     return value
138
139   return default
140
141
142 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
143   """Subclass ensuring request handlers are locked in RAM.
144
145   """
146   def __init__(self, *args, **kwargs):
147     utils.Mlockall()
148
149     http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
150
151
152 class NodeRequestHandler(http.server.HttpServerHandler):
153   """The server implementation.
154
155   This class holds all methods exposed over the RPC interface.
156
157   """
158   # too many public methods, and unused args - all methods get params
159   # due to the API
160   # pylint: disable=R0904,W0613
161   def __init__(self):
162     http.server.HttpServerHandler.__init__(self)
163     self.noded_pid = os.getpid()
164
165   def HandleRequest(self, req):
166     """Handle a request.
167
168     """
169     if req.request_method.upper() != http.HTTP_POST:
170       raise http.HttpBadRequest("Only the POST method is supported")
171
172     path = req.request_path
173     if path.startswith("/"):
174       path = path[1:]
175
176     method = getattr(self, "perspective_%s" % path, None)
177     if method is None:
178       raise http.HttpNotFound()
179
180     try:
181       result = (True, method(serializer.LoadJson(req.request_body)))
182
183     except backend.RPCFail, err:
184       # our custom failure exception; str(err) works fine if the
185       # exception was constructed with a single argument, and in
186       # this case, err.message == err.args[0] == str(err)
187       result = (False, str(err))
188     except errors.QuitGanetiException, err:
189       # Tell parent to quit
190       logging.info("Shutting down the node daemon, arguments: %s",
191                    str(err.args))
192       os.kill(self.noded_pid, signal.SIGTERM)
193       # And return the error's arguments, which must be already in
194       # correct tuple format
195       result = err.args
196     except Exception, err:
197       logging.exception("Error in RPC call")
198       result = (False, "Error while executing backend function: %s" % str(err))
199
200     return serializer.DumpJson(result)
201
202   # the new block devices  --------------------------
203
204   @staticmethod
205   def perspective_blockdev_create(params):
206     """Create a block device.
207
208     """
209     (bdev_s, size, owner, on_primary, info, excl_stor) = params
210     bdev = objects.Disk.FromDict(bdev_s)
211     if bdev is None:
212       raise ValueError("can't unserialize data!")
213     return backend.BlockdevCreate(bdev, size, owner, on_primary, info,
214                                   excl_stor)
215
216   @staticmethod
217   def perspective_blockdev_pause_resume_sync(params):
218     """Pause/resume sync of a block device.
219
220     """
221     disks_s, pause = params
222     disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
223     return backend.BlockdevPauseResumeSync(disks, pause)
224
225   @staticmethod
226   def perspective_blockdev_wipe(params):
227     """Wipe a block device.
228
229     """
230     bdev_s, offset, size = params
231     bdev = objects.Disk.FromDict(bdev_s)
232     return backend.BlockdevWipe(bdev, offset, size)
233
234   @staticmethod
235   def perspective_blockdev_remove(params):
236     """Remove a block device.
237
238     """
239     bdev_s = params[0]
240     bdev = objects.Disk.FromDict(bdev_s)
241     return backend.BlockdevRemove(bdev)
242
243   @staticmethod
244   def perspective_blockdev_rename(params):
245     """Remove a block device.
246
247     """
248     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]]
249     return backend.BlockdevRename(devlist)
250
251   @staticmethod
252   def perspective_blockdev_assemble(params):
253     """Assemble a block device.
254
255     """
256     bdev_s, owner, on_primary, idx = params
257     bdev = objects.Disk.FromDict(bdev_s)
258     if bdev is None:
259       raise ValueError("can't unserialize data!")
260     return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
261
262   @staticmethod
263   def perspective_blockdev_shutdown(params):
264     """Shutdown a block device.
265
266     """
267     bdev_s = params[0]
268     bdev = objects.Disk.FromDict(bdev_s)
269     if bdev is None:
270       raise ValueError("can't unserialize data!")
271     return backend.BlockdevShutdown(bdev)
272
273   @staticmethod
274   def perspective_blockdev_addchildren(params):
275     """Add a child to a mirror device.
276
277     Note: this is only valid for mirror devices. It's the caller's duty
278     to send a correct disk, otherwise we raise an error.
279
280     """
281     bdev_s, ndev_s = params
282     bdev = objects.Disk.FromDict(bdev_s)
283     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
284     if bdev is None or ndevs.count(None) > 0:
285       raise ValueError("can't unserialize data!")
286     return backend.BlockdevAddchildren(bdev, ndevs)
287
288   @staticmethod
289   def perspective_blockdev_removechildren(params):
290     """Remove a child from a mirror device.
291
292     This is only valid for mirror devices, of course. It's the callers
293     duty to send a correct disk, otherwise we raise an error.
294
295     """
296     bdev_s, ndev_s = params
297     bdev = objects.Disk.FromDict(bdev_s)
298     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
299     if bdev is None or ndevs.count(None) > 0:
300       raise ValueError("can't unserialize data!")
301     return backend.BlockdevRemovechildren(bdev, ndevs)
302
303   @staticmethod
304   def perspective_blockdev_getmirrorstatus(params):
305     """Return the mirror status for a list of disks.
306
307     """
308     disks = [objects.Disk.FromDict(dsk_s)
309              for dsk_s in params[0]]
310     return [status.ToDict()
311             for status in backend.BlockdevGetmirrorstatus(disks)]
312
313   @staticmethod
314   def perspective_blockdev_getmirrorstatus_multi(params):
315     """Return the mirror status for a list of disks.
316
317     """
318     (node_disks, ) = params
319
320     disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
321
322     result = []
323
324     for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
325       if success:
326         result.append((success, status.ToDict()))
327       else:
328         result.append((success, status))
329
330     return result
331
332   @staticmethod
333   def perspective_blockdev_find(params):
334     """Expose the FindBlockDevice functionality for a disk.
335
336     This will try to find but not activate a disk.
337
338     """
339     disk = objects.Disk.FromDict(params[0])
340
341     result = backend.BlockdevFind(disk)
342     if result is None:
343       return None
344
345     return result.ToDict()
346
347   @staticmethod
348   def perspective_blockdev_snapshot(params):
349     """Create a snapshot device.
350
351     Note that this is only valid for LVM disks, if we get passed
352     something else we raise an exception. The snapshot device can be
353     remove by calling the generic block device remove call.
354
355     """
356     cfbd = objects.Disk.FromDict(params[0])
357     return backend.BlockdevSnapshot(cfbd)
358
359   @staticmethod
360   def perspective_blockdev_grow(params):
361     """Grow a stack of devices.
362
363     """
364     if len(params) < 4:
365       raise ValueError("Received only 3 parameters in blockdev_grow,"
366                        " old master?")
367     cfbd = objects.Disk.FromDict(params[0])
368     amount = params[1]
369     dryrun = params[2]
370     backingstore = params[3]
371     return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore)
372
373   @staticmethod
374   def perspective_blockdev_close(params):
375     """Closes the given block devices.
376
377     """
378     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
379     return backend.BlockdevClose(params[0], disks)
380
381   @staticmethod
382   def perspective_blockdev_getsize(params):
383     """Compute the sizes of the given block devices.
384
385     """
386     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
387     return backend.BlockdevGetsize(disks)
388
389   @staticmethod
390   def perspective_blockdev_export(params):
391     """Compute the sizes of the given block devices.
392
393     """
394     disk = objects.Disk.FromDict(params[0])
395     dest_node, dest_path, cluster_name = params[1:]
396     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
397
398   @staticmethod
399   def perspective_blockdev_setinfo(params):
400     """Sets metadata information on the given block device.
401
402     """
403     (disk, info) = params
404     disk = objects.Disk.FromDict(disk)
405     return backend.BlockdevSetInfo(disk, info)
406
407   # blockdev/drbd specific methods ----------
408
409   @staticmethod
410   def perspective_drbd_disconnect_net(params):
411     """Disconnects the network connection of drbd disks.
412
413     Note that this is only valid for drbd disks, so the members of the
414     disk list must all be drbd devices.
415
416     """
417     nodes_ip, disks = params
418     disks = [objects.Disk.FromDict(cf) for cf in disks]
419     return backend.DrbdDisconnectNet(nodes_ip, disks)
420
421   @staticmethod
422   def perspective_drbd_attach_net(params):
423     """Attaches the network connection of drbd disks.
424
425     Note that this is only valid for drbd disks, so the members of the
426     disk list must all be drbd devices.
427
428     """
429     nodes_ip, disks, instance_name, multimaster = params
430     disks = [objects.Disk.FromDict(cf) for cf in disks]
431     return backend.DrbdAttachNet(nodes_ip, disks,
432                                      instance_name, multimaster)
433
434   @staticmethod
435   def perspective_drbd_wait_sync(params):
436     """Wait until DRBD disks are synched.
437
438     Note that this is only valid for drbd disks, so the members of the
439     disk list must all be drbd devices.
440
441     """
442     nodes_ip, disks = params
443     disks = [objects.Disk.FromDict(cf) for cf in disks]
444     return backend.DrbdWaitSync(nodes_ip, disks)
445
446   @staticmethod
447   def perspective_drbd_helper(params):
448     """Query drbd helper.
449
450     """
451     return backend.GetDrbdUsermodeHelper()
452
453   # export/import  --------------------------
454
455   @staticmethod
456   def perspective_finalize_export(params):
457     """Expose the finalize export functionality.
458
459     """
460     instance = objects.Instance.FromDict(params[0])
461
462     snap_disks = []
463     for disk in params[1]:
464       if isinstance(disk, bool):
465         snap_disks.append(disk)
466       else:
467         snap_disks.append(objects.Disk.FromDict(disk))
468
469     return backend.FinalizeExport(instance, snap_disks)
470
471   @staticmethod
472   def perspective_export_info(params):
473     """Query information about an existing export on this node.
474
475     The given path may not contain an export, in which case we return
476     None.
477
478     """
479     path = params[0]
480     return backend.ExportInfo(path)
481
482   @staticmethod
483   def perspective_export_list(params):
484     """List the available exports on this node.
485
486     Note that as opposed to export_info, which may query data about an
487     export in any path, this only queries the standard Ganeti path
488     (pathutils.EXPORT_DIR).
489
490     """
491     return backend.ListExports()
492
493   @staticmethod
494   def perspective_export_remove(params):
495     """Remove an export.
496
497     """
498     export = params[0]
499     return backend.RemoveExport(export)
500
501   # block device ---------------------
502   @staticmethod
503   def perspective_bdev_sizes(params):
504     """Query the list of block devices
505
506     """
507     devices = params[0]
508     return backend.GetBlockDevSizes(devices)
509
510   # volume  --------------------------
511
512   @staticmethod
513   def perspective_lv_list(params):
514     """Query the list of logical volumes in a given volume group.
515
516     """
517     vgname = params[0]
518     return backend.GetVolumeList(vgname)
519
520   @staticmethod
521   def perspective_vg_list(params):
522     """Query the list of volume groups.
523
524     """
525     return backend.ListVolumeGroups()
526
527   # Storage --------------------------
528
529   @staticmethod
530   def perspective_storage_list(params):
531     """Get list of storage units.
532
533     """
534     (su_name, su_args, name, fields) = params
535     return storage.GetStorage(su_name, *su_args).List(name, fields)
536
537   @staticmethod
538   def perspective_storage_modify(params):
539     """Modify a storage unit.
540
541     """
542     (su_name, su_args, name, changes) = params
543     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
544
545   @staticmethod
546   def perspective_storage_execute(params):
547     """Execute an operation on a storage unit.
548
549     """
550     (su_name, su_args, name, op) = params
551     return storage.GetStorage(su_name, *su_args).Execute(name, op)
552
553   # bridge  --------------------------
554
555   @staticmethod
556   def perspective_bridges_exist(params):
557     """Check if all bridges given exist on this node.
558
559     """
560     bridges_list = params[0]
561     return backend.BridgesExist(bridges_list)
562
563   # instance  --------------------------
564
565   @staticmethod
566   def perspective_instance_os_add(params):
567     """Install an OS on a given instance.
568
569     """
570     inst_s = params[0]
571     inst = objects.Instance.FromDict(inst_s)
572     reinstall = params[1]
573     debug = params[2]
574     return backend.InstanceOsAdd(inst, reinstall, debug)
575
576   @staticmethod
577   def perspective_instance_run_rename(params):
578     """Runs the OS rename script for an instance.
579
580     """
581     inst_s, old_name, debug = params
582     inst = objects.Instance.FromDict(inst_s)
583     return backend.RunRenameInstance(inst, old_name, debug)
584
585   @staticmethod
586   def perspective_instance_shutdown(params):
587     """Shutdown an instance.
588
589     """
590     instance = objects.Instance.FromDict(params[0])
591     timeout = params[1]
592     trail = params[2]
593     _extendReasonTrail(trail, "shutdown")
594     return backend.InstanceShutdown(instance, timeout, trail)
595
596   @staticmethod
597   def perspective_instance_start(params):
598     """Start an instance.
599
600     """
601     (instance_name, startup_paused, trail) = params
602     instance = objects.Instance.FromDict(instance_name)
603     _extendReasonTrail(trail, "start")
604     return backend.StartInstance(instance, startup_paused, trail)
605
606   @staticmethod
607   def perspective_migration_info(params):
608     """Gather information about an instance to be migrated.
609
610     """
611     instance = objects.Instance.FromDict(params[0])
612     return backend.MigrationInfo(instance)
613
614   @staticmethod
615   def perspective_accept_instance(params):
616     """Prepare the node to accept an instance.
617
618     """
619     instance, info, target = params
620     instance = objects.Instance.FromDict(instance)
621     return backend.AcceptInstance(instance, info, target)
622
623   @staticmethod
624   def perspective_instance_finalize_migration_dst(params):
625     """Finalize the instance migration on the destination node.
626
627     """
628     instance, info, success = params
629     instance = objects.Instance.FromDict(instance)
630     return backend.FinalizeMigrationDst(instance, info, success)
631
632   @staticmethod
633   def perspective_instance_migrate(params):
634     """Migrates an instance.
635
636     """
637     instance, target, live = params
638     instance = objects.Instance.FromDict(instance)
639     return backend.MigrateInstance(instance, target, live)
640
641   @staticmethod
642   def perspective_instance_finalize_migration_src(params):
643     """Finalize the instance migration on the source node.
644
645     """
646     instance, success, live = params
647     instance = objects.Instance.FromDict(instance)
648     return backend.FinalizeMigrationSource(instance, success, live)
649
650   @staticmethod
651   def perspective_instance_get_migration_status(params):
652     """Reports migration status.
653
654     """
655     instance = objects.Instance.FromDict(params[0])
656     return backend.GetMigrationStatus(instance).ToDict()
657
658   @staticmethod
659   def perspective_instance_reboot(params):
660     """Reboot an instance.
661
662     """
663     instance = objects.Instance.FromDict(params[0])
664     reboot_type = params[1]
665     shutdown_timeout = params[2]
666     trail = params[3]
667     _extendReasonTrail(trail, "reboot")
668     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout,
669                                   trail)
670
671   @staticmethod
672   def perspective_instance_balloon_memory(params):
673     """Modify instance runtime memory.
674
675     """
676     instance_dict, memory = params
677     instance = objects.Instance.FromDict(instance_dict)
678     return backend.InstanceBalloonMemory(instance, memory)
679
680   @staticmethod
681   def perspective_instance_info(params):
682     """Query instance information.
683
684     """
685     return backend.GetInstanceInfo(params[0], params[1])
686
687   @staticmethod
688   def perspective_instance_migratable(params):
689     """Query whether the specified instance can be migrated.
690
691     """
692     instance = objects.Instance.FromDict(params[0])
693     return backend.GetInstanceMigratable(instance)
694
695   @staticmethod
696   def perspective_all_instances_info(params):
697     """Query information about all instances.
698
699     """
700     return backend.GetAllInstancesInfo(params[0])
701
702   @staticmethod
703   def perspective_instance_list(params):
704     """Query the list of running instances.
705
706     """
707     return backend.GetInstanceList(params[0])
708
709   # node --------------------------
710
711   @staticmethod
712   def perspective_node_has_ip_address(params):
713     """Checks if a node has the given ip address.
714
715     """
716     return netutils.IPAddress.Own(params[0])
717
718   @staticmethod
719   def perspective_node_info(params):
720     """Query node information.
721
722     """
723     (vg_names, hv_names, excl_stor) = params
724     return backend.GetNodeInfo(vg_names, hv_names, excl_stor)
725
726   @staticmethod
727   def perspective_etc_hosts_modify(params):
728     """Modify a node entry in /etc/hosts.
729
730     """
731     backend.EtcHostsModify(params[0], params[1], params[2])
732
733     return True
734
735   @staticmethod
736   def perspective_node_verify(params):
737     """Run a verify sequence on this node.
738
739     """
740     return backend.VerifyNode(params[0], params[1])
741
742   @classmethod
743   def perspective_node_verify_light(cls, params):
744     """Run a light verify sequence on this node.
745
746     """
747     # So far it's the same as the normal node_verify
748     return cls.perspective_node_verify(params)
749
750   @staticmethod
751   def perspective_node_start_master_daemons(params):
752     """Start the master daemons on this node.
753
754     """
755     return backend.StartMasterDaemons(params[0])
756
757   @staticmethod
758   def perspective_node_activate_master_ip(params):
759     """Activate the master IP on this node.
760
761     """
762     master_params = objects.MasterNetworkParameters.FromDict(params[0])
763     return backend.ActivateMasterIp(master_params, params[1])
764
765   @staticmethod
766   def perspective_node_deactivate_master_ip(params):
767     """Deactivate the master IP on this node.
768
769     """
770     master_params = objects.MasterNetworkParameters.FromDict(params[0])
771     return backend.DeactivateMasterIp(master_params, params[1])
772
773   @staticmethod
774   def perspective_node_stop_master(params):
775     """Stops master daemons on this node.
776
777     """
778     return backend.StopMasterDaemons()
779
780   @staticmethod
781   def perspective_node_change_master_netmask(params):
782     """Change the master IP netmask.
783
784     """
785     return backend.ChangeMasterNetmask(params[0], params[1], params[2],
786                                        params[3])
787
788   @staticmethod
789   def perspective_node_leave_cluster(params):
790     """Cleanup after leaving a cluster.
791
792     """
793     return backend.LeaveCluster(params[0])
794
795   @staticmethod
796   def perspective_node_volumes(params):
797     """Query the list of all logical volume groups.
798
799     """
800     return backend.NodeVolumes()
801
802   @staticmethod
803   def perspective_node_demote_from_mc(params):
804     """Demote a node from the master candidate role.
805
806     """
807     return backend.DemoteFromMC()
808
809   @staticmethod
810   def perspective_node_powercycle(params):
811     """Tries to powercycle the nod.
812
813     """
814     hypervisor_type = params[0]
815     return backend.PowercycleNode(hypervisor_type)
816
817   # cluster --------------------------
818
819   @staticmethod
820   def perspective_version(params):
821     """Query version information.
822
823     """
824     return constants.PROTOCOL_VERSION
825
826   @staticmethod
827   def perspective_upload_file(params):
828     """Upload a file.
829
830     Note that the backend implementation imposes strict rules on which
831     files are accepted.
832
833     """
834     return backend.UploadFile(*(params[0]))
835
836   @staticmethod
837   def perspective_master_info(params):
838     """Query master information.
839
840     """
841     return backend.GetMasterInfo()
842
843   @staticmethod
844   def perspective_run_oob(params):
845     """Runs oob on node.
846
847     """
848     output = backend.RunOob(params[0], params[1], params[2], params[3])
849     if output:
850       result = serializer.LoadJson(output)
851     else:
852       result = None
853     return result
854
855   @staticmethod
856   def perspective_restricted_command(params):
857     """Runs a restricted command.
858
859     """
860     (cmd, ) = params
861
862     return backend.RunRestrictedCmd(cmd)
863
864   @staticmethod
865   def perspective_write_ssconf_files(params):
866     """Write ssconf files.
867
868     """
869     (values,) = params
870     return ssconf.WriteSsconfFiles(values)
871
872   @staticmethod
873   def perspective_get_watcher_pause(params):
874     """Get watcher pause end.
875
876     """
877     return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
878
879   @staticmethod
880   def perspective_set_watcher_pause(params):
881     """Set watcher pause.
882
883     """
884     (until, ) = params
885     return backend.SetWatcherPause(until)
886
887   # os -----------------------
888
889   @staticmethod
890   def perspective_os_diagnose(params):
891     """Query detailed information about existing OSes.
892
893     """
894     return backend.DiagnoseOS()
895
896   @staticmethod
897   def perspective_os_get(params):
898     """Query information about a given OS.
899
900     """
901     name = params[0]
902     os_obj = backend.OSFromDisk(name)
903     return os_obj.ToDict()
904
905   @staticmethod
906   def perspective_os_validate(params):
907     """Run a given OS' validation routine.
908
909     """
910     required, name, checks, params = params
911     return backend.ValidateOS(required, name, checks, params)
912
913   # extstorage -----------------------
914
915   @staticmethod
916   def perspective_extstorage_diagnose(params):
917     """Query detailed information about existing extstorage providers.
918
919     """
920     return backend.DiagnoseExtStorage()
921
922   # hooks -----------------------
923
924   @staticmethod
925   def perspective_hooks_runner(params):
926     """Run hook scripts.
927
928     """
929     hpath, phase, env = params
930     hr = backend.HooksRunner()
931     return hr.RunHooks(hpath, phase, env)
932
933   # iallocator -----------------
934
935   @staticmethod
936   def perspective_iallocator_runner(params):
937     """Run an iallocator script.
938
939     """
940     name, idata = params
941     iar = backend.IAllocatorRunner()
942     return iar.Run(name, idata)
943
944   # test -----------------------
945
946   @staticmethod
947   def perspective_test_delay(params):
948     """Run test delay.
949
950     """
951     duration = params[0]
952     status, rval = utils.TestDelay(duration)
953     if not status:
954       raise backend.RPCFail(rval)
955     return rval
956
957   # file storage ---------------
958
959   @staticmethod
960   def perspective_file_storage_dir_create(params):
961     """Create the file storage directory.
962
963     """
964     file_storage_dir = params[0]
965     return backend.CreateFileStorageDir(file_storage_dir)
966
967   @staticmethod
968   def perspective_file_storage_dir_remove(params):
969     """Remove the file storage directory.
970
971     """
972     file_storage_dir = params[0]
973     return backend.RemoveFileStorageDir(file_storage_dir)
974
975   @staticmethod
976   def perspective_file_storage_dir_rename(params):
977     """Rename the file storage directory.
978
979     """
980     old_file_storage_dir = params[0]
981     new_file_storage_dir = params[1]
982     return backend.RenameFileStorageDir(old_file_storage_dir,
983                                         new_file_storage_dir)
984
985   # jobs ------------------------
986
987   @staticmethod
988   @_RequireJobQueueLock
989   def perspective_jobqueue_update(params):
990     """Update job queue.
991
992     """
993     (file_name, content) = params
994     return backend.JobQueueUpdate(file_name, content)
995
996   @staticmethod
997   @_RequireJobQueueLock
998   def perspective_jobqueue_purge(params):
999     """Purge job queue.
1000
1001     """
1002     return backend.JobQueuePurge()
1003
1004   @staticmethod
1005   @_RequireJobQueueLock
1006   def perspective_jobqueue_rename(params):
1007     """Rename a job queue file.
1008
1009     """
1010     # TODO: What if a file fails to rename?
1011     return [backend.JobQueueRename(old, new) for old, new in params[0]]
1012
1013   @staticmethod
1014   @_RequireJobQueueLock
1015   def perspective_jobqueue_set_drain_flag(params):
1016     """Set job queue's drain flag.
1017
1018     """
1019     (flag, ) = params
1020
1021     return jstore.SetDrainFlag(flag)
1022
1023   # hypervisor ---------------
1024
1025   @staticmethod
1026   def perspective_hypervisor_validate_params(params):
1027     """Validate the hypervisor parameters.
1028
1029     """
1030     (hvname, hvparams) = params
1031     return backend.ValidateHVParams(hvname, hvparams)
1032
1033   # Crypto
1034
1035   @staticmethod
1036   def perspective_x509_cert_create(params):
1037     """Creates a new X509 certificate for SSL/TLS.
1038
1039     """
1040     (validity, ) = params
1041     return backend.CreateX509Certificate(validity)
1042
1043   @staticmethod
1044   def perspective_x509_cert_remove(params):
1045     """Removes a X509 certificate.
1046
1047     """
1048     (name, ) = params
1049     return backend.RemoveX509Certificate(name)
1050
1051   # Import and export
1052
1053   @staticmethod
1054   def perspective_import_start(params):
1055     """Starts an import daemon.
1056
1057     """
1058     (opts_s, instance, component, (dest, dest_args)) = params
1059
1060     opts = objects.ImportExportOptions.FromDict(opts_s)
1061
1062     return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
1063                                            None, None,
1064                                            objects.Instance.FromDict(instance),
1065                                            component, dest,
1066                                            _DecodeImportExportIO(dest,
1067                                                                  dest_args))
1068
1069   @staticmethod
1070   def perspective_export_start(params):
1071     """Starts an export daemon.
1072
1073     """
1074     (opts_s, host, port, instance, component, (source, source_args)) = params
1075
1076     opts = objects.ImportExportOptions.FromDict(opts_s)
1077
1078     return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
1079                                            host, port,
1080                                            objects.Instance.FromDict(instance),
1081                                            component, source,
1082                                            _DecodeImportExportIO(source,
1083                                                                  source_args))
1084
1085   @staticmethod
1086   def perspective_impexp_status(params):
1087     """Retrieves the status of an import or export daemon.
1088
1089     """
1090     return backend.GetImportExportStatus(params[0])
1091
1092   @staticmethod
1093   def perspective_impexp_abort(params):
1094     """Aborts an import or export.
1095
1096     """
1097     return backend.AbortImportExport(params[0])
1098
1099   @staticmethod
1100   def perspective_impexp_cleanup(params):
1101     """Cleans up after an import or export.
1102
1103     """
1104     return backend.CleanupImportExport(params[0])
1105
1106
1107 def CheckNoded(_, args):
1108   """Initial checks whether to run or exit with a failure.
1109
1110   """
1111   if args: # noded doesn't take any arguments
1112     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1113                           sys.argv[0])
1114     sys.exit(constants.EXIT_FAILURE)
1115   try:
1116     codecs.lookup("string-escape")
1117   except LookupError:
1118     print >> sys.stderr, ("Can't load the string-escape code which is part"
1119                           " of the Python installation. Is your installation"
1120                           " complete/correct? Aborting.")
1121     sys.exit(constants.EXIT_FAILURE)
1122
1123
1124 def PrepNoded(options, _):
1125   """Preparation node daemon function, executed with the PID file held.
1126
1127   """
1128   if options.mlock:
1129     request_executor_class = MlockallRequestExecutor
1130     try:
1131       utils.Mlockall()
1132     except errors.NoCtypesError:
1133       logging.warning("Cannot set memory lock, ctypes module not found")
1134       request_executor_class = http.server.HttpServerRequestExecutor
1135   else:
1136     request_executor_class = http.server.HttpServerRequestExecutor
1137
1138   # Read SSL certificate
1139   if options.ssl:
1140     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1141                                     ssl_cert_path=options.ssl_cert)
1142   else:
1143     ssl_params = None
1144
1145   err = _PrepareQueueLock()
1146   if err is not None:
1147     # this might be some kind of file-system/permission error; while
1148     # this breaks the job queue functionality, we shouldn't prevent
1149     # startup of the whole node daemon because of this
1150     logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1151
1152   handler = NodeRequestHandler()
1153
1154   mainloop = daemon.Mainloop()
1155   server = \
1156     http.server.HttpServer(mainloop, options.bind_address, options.port,
1157                            handler, ssl_params=ssl_params, ssl_verify_peer=True,
1158                            request_executor_class=request_executor_class)
1159   server.Start()
1160
1161   return (mainloop, server)
1162
1163
1164 def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1165   """Main node daemon function, executed with the PID file held.
1166
1167   """
1168   (mainloop, server) = prep_data
1169   try:
1170     mainloop.Run()
1171   finally:
1172     server.Stop()
1173
1174
1175 def Main():
1176   """Main function for the node daemon.
1177
1178   """
1179   parser = OptionParser(description="Ganeti node daemon",
1180                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
1181                         version="%%prog (ganeti) %s" %
1182                         constants.RELEASE_VERSION)
1183   parser.add_option("--no-mlock", dest="mlock",
1184                     help="Do not mlock the node memory in ram",
1185                     default=True, action="store_false")
1186
1187   daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1188                      default_ssl_cert=pathutils.NODED_CERT_FILE,
1189                      default_ssl_key=pathutils.NODED_CERT_FILE,
1190                      console_logging=True)