Add general storage parameters to node info call
[ganeti-local] / lib / server / noded.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36 import codecs
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti.storage import container
49 from ganeti import serializer
50 from ganeti import netutils
51 from ganeti import pathutils
52 from ganeti import ssconf
53
54 import ganeti.http.server # pylint: disable=W0611
55
56
57 queue_lock = None
58
59
60 def _extendReasonTrail(trail, source, reason=""):
61   """Extend the reason trail with noded information
62
63   The trail is extended by appending the name of the noded functionality
64   """
65   assert trail is not None
66   trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source)
67   trail.append((trail_source, reason, utils.EpochNano()))
68
69
70 def _PrepareQueueLock():
71   """Try to prepare the queue lock.
72
73   @return: None for success, otherwise an exception object
74
75   """
76   global queue_lock # pylint: disable=W0603
77
78   if queue_lock is not None:
79     return None
80
81   # Prepare job queue
82   try:
83     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
84     return None
85   except EnvironmentError, err:
86     return err
87
88
89 def _RequireJobQueueLock(fn):
90   """Decorator for job queue manipulating functions.
91
92   """
93   QUEUE_LOCK_TIMEOUT = 10
94
95   def wrapper(*args, **kwargs):
96     # Locking in exclusive, blocking mode because there could be several
97     # children running at the same time. Waiting up to 10 seconds.
98     if _PrepareQueueLock() is not None:
99       raise errors.JobQueueError("Job queue failed initialization,"
100                                  " cannot update jobs")
101     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
102     try:
103       return fn(*args, **kwargs)
104     finally:
105       queue_lock.Unlock()
106
107   return wrapper
108
109
110 def _DecodeImportExportIO(ieio, ieioargs):
111   """Decodes import/export I/O information.
112
113   """
114   if ieio == constants.IEIO_RAW_DISK:
115     assert len(ieioargs) == 1
116     return (objects.Disk.FromDict(ieioargs[0]), )
117
118   if ieio == constants.IEIO_SCRIPT:
119     assert len(ieioargs) == 2
120     return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
121
122   return ieioargs
123
124
125 def _DefaultAlternative(value, default):
126   """Returns value or, if evaluating to False, a default value.
127
128   Returns the given value, unless it evaluates to False. In the latter case the
129   default value is returned.
130
131   @param value: Value to return if it doesn't evaluate to False
132   @param default: Default value
133   @return: Given value or the default
134
135   """
136   if value:
137     return value
138
139   return default
140
141
142 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
143   """Subclass ensuring request handlers are locked in RAM.
144
145   """
146   def __init__(self, *args, **kwargs):
147     utils.Mlockall()
148
149     http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
150
151
152 class NodeRequestHandler(http.server.HttpServerHandler):
153   """The server implementation.
154
155   This class holds all methods exposed over the RPC interface.
156
157   """
158   # too many public methods, and unused args - all methods get params
159   # due to the API
160   # pylint: disable=R0904,W0613
161   def __init__(self):
162     http.server.HttpServerHandler.__init__(self)
163     self.noded_pid = os.getpid()
164
165   def HandleRequest(self, req):
166     """Handle a request.
167
168     """
169     if req.request_method.upper() != http.HTTP_POST:
170       raise http.HttpBadRequest("Only the POST method is supported")
171
172     path = req.request_path
173     if path.startswith("/"):
174       path = path[1:]
175
176     method = getattr(self, "perspective_%s" % path, None)
177     if method is None:
178       raise http.HttpNotFound()
179
180     try:
181       result = (True, method(serializer.LoadJson(req.request_body)))
182
183     except backend.RPCFail, err:
184       # our custom failure exception; str(err) works fine if the
185       # exception was constructed with a single argument, and in
186       # this case, err.message == err.args[0] == str(err)
187       result = (False, str(err))
188     except errors.QuitGanetiException, err:
189       # Tell parent to quit
190       logging.info("Shutting down the node daemon, arguments: %s",
191                    str(err.args))
192       os.kill(self.noded_pid, signal.SIGTERM)
193       # And return the error's arguments, which must be already in
194       # correct tuple format
195       result = err.args
196     except Exception, err:
197       logging.exception("Error in RPC call")
198       result = (False, "Error while executing backend function: %s" % str(err))
199
200     return serializer.DumpJson(result)
201
202   # the new block devices  --------------------------
203
204   @staticmethod
205   def perspective_blockdev_create(params):
206     """Create a block device.
207
208     """
209     (bdev_s, size, owner, on_primary, info, excl_stor) = params
210     bdev = objects.Disk.FromDict(bdev_s)
211     if bdev is None:
212       raise ValueError("can't unserialize data!")
213     return backend.BlockdevCreate(bdev, size, owner, on_primary, info,
214                                   excl_stor)
215
216   @staticmethod
217   def perspective_blockdev_pause_resume_sync(params):
218     """Pause/resume sync of a block device.
219
220     """
221     disks_s, pause = params
222     disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
223     return backend.BlockdevPauseResumeSync(disks, pause)
224
225   @staticmethod
226   def perspective_blockdev_wipe(params):
227     """Wipe a block device.
228
229     """
230     bdev_s, offset, size = params
231     bdev = objects.Disk.FromDict(bdev_s)
232     return backend.BlockdevWipe(bdev, offset, size)
233
234   @staticmethod
235   def perspective_blockdev_remove(params):
236     """Remove a block device.
237
238     """
239     bdev_s = params[0]
240     bdev = objects.Disk.FromDict(bdev_s)
241     return backend.BlockdevRemove(bdev)
242
243   @staticmethod
244   def perspective_blockdev_rename(params):
245     """Remove a block device.
246
247     """
248     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]]
249     return backend.BlockdevRename(devlist)
250
251   @staticmethod
252   def perspective_blockdev_assemble(params):
253     """Assemble a block device.
254
255     """
256     bdev_s, owner, on_primary, idx = params
257     bdev = objects.Disk.FromDict(bdev_s)
258     if bdev is None:
259       raise ValueError("can't unserialize data!")
260     return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
261
262   @staticmethod
263   def perspective_blockdev_shutdown(params):
264     """Shutdown a block device.
265
266     """
267     bdev_s = params[0]
268     bdev = objects.Disk.FromDict(bdev_s)
269     if bdev is None:
270       raise ValueError("can't unserialize data!")
271     return backend.BlockdevShutdown(bdev)
272
273   @staticmethod
274   def perspective_blockdev_addchildren(params):
275     """Add a child to a mirror device.
276
277     Note: this is only valid for mirror devices. It's the caller's duty
278     to send a correct disk, otherwise we raise an error.
279
280     """
281     bdev_s, ndev_s = params
282     bdev = objects.Disk.FromDict(bdev_s)
283     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
284     if bdev is None or ndevs.count(None) > 0:
285       raise ValueError("can't unserialize data!")
286     return backend.BlockdevAddchildren(bdev, ndevs)
287
288   @staticmethod
289   def perspective_blockdev_removechildren(params):
290     """Remove a child from a mirror device.
291
292     This is only valid for mirror devices, of course. It's the callers
293     duty to send a correct disk, otherwise we raise an error.
294
295     """
296     bdev_s, ndev_s = params
297     bdev = objects.Disk.FromDict(bdev_s)
298     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
299     if bdev is None or ndevs.count(None) > 0:
300       raise ValueError("can't unserialize data!")
301     return backend.BlockdevRemovechildren(bdev, ndevs)
302
303   @staticmethod
304   def perspective_blockdev_getmirrorstatus(params):
305     """Return the mirror status for a list of disks.
306
307     """
308     disks = [objects.Disk.FromDict(dsk_s)
309              for dsk_s in params[0]]
310     return [status.ToDict()
311             for status in backend.BlockdevGetmirrorstatus(disks)]
312
313   @staticmethod
314   def perspective_blockdev_getmirrorstatus_multi(params):
315     """Return the mirror status for a list of disks.
316
317     """
318     (node_disks, ) = params
319
320     disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
321
322     result = []
323
324     for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
325       if success:
326         result.append((success, status.ToDict()))
327       else:
328         result.append((success, status))
329
330     return result
331
332   @staticmethod
333   def perspective_blockdev_find(params):
334     """Expose the FindBlockDevice functionality for a disk.
335
336     This will try to find but not activate a disk.
337
338     """
339     disk = objects.Disk.FromDict(params[0])
340
341     result = backend.BlockdevFind(disk)
342     if result is None:
343       return None
344
345     return result.ToDict()
346
347   @staticmethod
348   def perspective_blockdev_snapshot(params):
349     """Create a snapshot device.
350
351     Note that this is only valid for LVM disks, if we get passed
352     something else we raise an exception. The snapshot device can be
353     remove by calling the generic block device remove call.
354
355     """
356     cfbd = objects.Disk.FromDict(params[0])
357     return backend.BlockdevSnapshot(cfbd)
358
359   @staticmethod
360   def perspective_blockdev_grow(params):
361     """Grow a stack of devices.
362
363     """
364     if len(params) < 5:
365       raise ValueError("Received only %s parameters in blockdev_grow,"
366                        " old master?" % len(params))
367     cfbd = objects.Disk.FromDict(params[0])
368     amount = params[1]
369     dryrun = params[2]
370     backingstore = params[3]
371     excl_stor = params[4]
372     return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore, excl_stor)
373
374   @staticmethod
375   def perspective_blockdev_close(params):
376     """Closes the given block devices.
377
378     """
379     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
380     return backend.BlockdevClose(params[0], disks)
381
382   @staticmethod
383   def perspective_blockdev_getdimensions(params):
384     """Compute the sizes of the given block devices.
385
386     """
387     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
388     return backend.BlockdevGetdimensions(disks)
389
390   @staticmethod
391   def perspective_blockdev_export(params):
392     """Compute the sizes of the given block devices.
393
394     """
395     disk = objects.Disk.FromDict(params[0])
396     dest_node, dest_path, cluster_name = params[1:]
397     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
398
399   @staticmethod
400   def perspective_blockdev_setinfo(params):
401     """Sets metadata information on the given block device.
402
403     """
404     (disk, info) = params
405     disk = objects.Disk.FromDict(disk)
406     return backend.BlockdevSetInfo(disk, info)
407
408   # blockdev/drbd specific methods ----------
409
410   @staticmethod
411   def perspective_drbd_disconnect_net(params):
412     """Disconnects the network connection of drbd disks.
413
414     Note that this is only valid for drbd disks, so the members of the
415     disk list must all be drbd devices.
416
417     """
418     nodes_ip, disks, target_node_uuid = params
419     disks = [objects.Disk.FromDict(cf) for cf in disks]
420     return backend.DrbdDisconnectNet(target_node_uuid, nodes_ip, disks)
421
422   @staticmethod
423   def perspective_drbd_attach_net(params):
424     """Attaches the network connection of drbd disks.
425
426     Note that this is only valid for drbd disks, so the members of the
427     disk list must all be drbd devices.
428
429     """
430     nodes_ip, disks, instance_name, multimaster, target_node_uuid = params
431     disks = [objects.Disk.FromDict(cf) for cf in disks]
432     return backend.DrbdAttachNet(target_node_uuid, nodes_ip, disks,
433                                  instance_name, multimaster)
434
435   @staticmethod
436   def perspective_drbd_wait_sync(params):
437     """Wait until DRBD disks are synched.
438
439     Note that this is only valid for drbd disks, so the members of the
440     disk list must all be drbd devices.
441
442     """
443     nodes_ip, disks, target_node_uuid = params
444     disks = [objects.Disk.FromDict(cf) for cf in disks]
445     return backend.DrbdWaitSync(target_node_uuid, nodes_ip, disks)
446
447   @staticmethod
448   def perspective_drbd_helper(params):
449     """Query drbd helper.
450
451     """
452     return backend.GetDrbdUsermodeHelper()
453
454   # export/import  --------------------------
455
456   @staticmethod
457   def perspective_finalize_export(params):
458     """Expose the finalize export functionality.
459
460     """
461     instance = objects.Instance.FromDict(params[0])
462
463     snap_disks = []
464     for disk in params[1]:
465       if isinstance(disk, bool):
466         snap_disks.append(disk)
467       else:
468         snap_disks.append(objects.Disk.FromDict(disk))
469
470     return backend.FinalizeExport(instance, snap_disks)
471
472   @staticmethod
473   def perspective_export_info(params):
474     """Query information about an existing export on this node.
475
476     The given path may not contain an export, in which case we return
477     None.
478
479     """
480     path = params[0]
481     return backend.ExportInfo(path)
482
483   @staticmethod
484   def perspective_export_list(params):
485     """List the available exports on this node.
486
487     Note that as opposed to export_info, which may query data about an
488     export in any path, this only queries the standard Ganeti path
489     (pathutils.EXPORT_DIR).
490
491     """
492     return backend.ListExports()
493
494   @staticmethod
495   def perspective_export_remove(params):
496     """Remove an export.
497
498     """
499     export = params[0]
500     return backend.RemoveExport(export)
501
502   # block device ---------------------
503   @staticmethod
504   def perspective_bdev_sizes(params):
505     """Query the list of block devices
506
507     """
508     devices = params[0]
509     return backend.GetBlockDevSizes(devices)
510
511   # volume  --------------------------
512
513   @staticmethod
514   def perspective_lv_list(params):
515     """Query the list of logical volumes in a given volume group.
516
517     """
518     vgname = params[0]
519     return backend.GetVolumeList(vgname)
520
521   @staticmethod
522   def perspective_vg_list(params):
523     """Query the list of volume groups.
524
525     """
526     return backend.ListVolumeGroups()
527
528   # Storage --------------------------
529
530   @staticmethod
531   def perspective_storage_list(params):
532     """Get list of storage units.
533
534     """
535     (su_name, su_args, name, fields) = params
536     return container.GetStorage(su_name, *su_args).List(name, fields)
537
538   @staticmethod
539   def perspective_storage_modify(params):
540     """Modify a storage unit.
541
542     """
543     (su_name, su_args, name, changes) = params
544     return container.GetStorage(su_name, *su_args).Modify(name, changes)
545
546   @staticmethod
547   def perspective_storage_execute(params):
548     """Execute an operation on a storage unit.
549
550     """
551     (su_name, su_args, name, op) = params
552     return container.GetStorage(su_name, *su_args).Execute(name, op)
553
554   # bridge  --------------------------
555
556   @staticmethod
557   def perspective_bridges_exist(params):
558     """Check if all bridges given exist on this node.
559
560     """
561     bridges_list = params[0]
562     return backend.BridgesExist(bridges_list)
563
564   # instance  --------------------------
565
566   @staticmethod
567   def perspective_instance_os_add(params):
568     """Install an OS on a given instance.
569
570     """
571     inst_s = params[0]
572     inst = objects.Instance.FromDict(inst_s)
573     reinstall = params[1]
574     debug = params[2]
575     return backend.InstanceOsAdd(inst, reinstall, debug)
576
577   @staticmethod
578   def perspective_instance_run_rename(params):
579     """Runs the OS rename script for an instance.
580
581     """
582     inst_s, old_name, debug = params
583     inst = objects.Instance.FromDict(inst_s)
584     return backend.RunRenameInstance(inst, old_name, debug)
585
586   @staticmethod
587   def perspective_instance_shutdown(params):
588     """Shutdown an instance.
589
590     """
591     instance = objects.Instance.FromDict(params[0])
592     timeout = params[1]
593     trail = params[2]
594     _extendReasonTrail(trail, "shutdown")
595     return backend.InstanceShutdown(instance, timeout, trail)
596
597   @staticmethod
598   def perspective_instance_start(params):
599     """Start an instance.
600
601     """
602     (instance_name, startup_paused, trail) = params
603     instance = objects.Instance.FromDict(instance_name)
604     _extendReasonTrail(trail, "start")
605     return backend.StartInstance(instance, startup_paused, trail)
606
607   @staticmethod
608   def perspective_migration_info(params):
609     """Gather information about an instance to be migrated.
610
611     """
612     instance = objects.Instance.FromDict(params[0])
613     return backend.MigrationInfo(instance)
614
615   @staticmethod
616   def perspective_accept_instance(params):
617     """Prepare the node to accept an instance.
618
619     """
620     instance, info, target = params
621     instance = objects.Instance.FromDict(instance)
622     return backend.AcceptInstance(instance, info, target)
623
624   @staticmethod
625   def perspective_instance_finalize_migration_dst(params):
626     """Finalize the instance migration on the destination node.
627
628     """
629     instance, info, success = params
630     instance = objects.Instance.FromDict(instance)
631     return backend.FinalizeMigrationDst(instance, info, success)
632
633   @staticmethod
634   def perspective_instance_migrate(params):
635     """Migrates an instance.
636
637     """
638     cluster_name, instance, target, live = params
639     instance = objects.Instance.FromDict(instance)
640     return backend.MigrateInstance(cluster_name, instance, target, live)
641
642   @staticmethod
643   def perspective_instance_finalize_migration_src(params):
644     """Finalize the instance migration on the source node.
645
646     """
647     instance, success, live = params
648     instance = objects.Instance.FromDict(instance)
649     return backend.FinalizeMigrationSource(instance, success, live)
650
651   @staticmethod
652   def perspective_instance_get_migration_status(params):
653     """Reports migration status.
654
655     """
656     instance = objects.Instance.FromDict(params[0])
657     return backend.GetMigrationStatus(instance).ToDict()
658
659   @staticmethod
660   def perspective_instance_reboot(params):
661     """Reboot an instance.
662
663     """
664     instance = objects.Instance.FromDict(params[0])
665     reboot_type = params[1]
666     shutdown_timeout = params[2]
667     trail = params[3]
668     _extendReasonTrail(trail, "reboot")
669     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout,
670                                   trail)
671
672   @staticmethod
673   def perspective_instance_balloon_memory(params):
674     """Modify instance runtime memory.
675
676     """
677     instance_dict, memory = params
678     instance = objects.Instance.FromDict(instance_dict)
679     return backend.InstanceBalloonMemory(instance, memory)
680
681   @staticmethod
682   def perspective_instance_info(params):
683     """Query instance information.
684
685     """
686     (instance_name, hypervisor_name, hvparams) = params
687     return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
688
689   @staticmethod
690   def perspective_instance_migratable(params):
691     """Query whether the specified instance can be migrated.
692
693     """
694     instance = objects.Instance.FromDict(params[0])
695     return backend.GetInstanceMigratable(instance)
696
697   @staticmethod
698   def perspective_all_instances_info(params):
699     """Query information about all instances.
700
701     """
702     (hypervisor_list, all_hvparams) = params
703     return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
704
705   @staticmethod
706   def perspective_instance_list(params):
707     """Query the list of running instances.
708
709     """
710     (hypervisor_list, hvparams) = params
711     return backend.GetInstanceList(hypervisor_list, hvparams)
712
713   # node --------------------------
714
715   @staticmethod
716   def perspective_node_has_ip_address(params):
717     """Checks if a node has the given ip address.
718
719     """
720     return netutils.IPAddress.Own(params[0])
721
722   @staticmethod
723   def perspective_node_info(params):
724     """Query node information.
725
726     """
727     # FIXME: remove the fallback to excl_stor once all callers are
728     # adjusted
729     if (len(params) == 3):
730       (legacy_storage_units, hv_specs, excl_stor) = params
731       storage_units = NodeRequestHandler._ConvertExclStorage(
732           legacy_storage_units, excl_stor)
733     else:
734       (storage_units, hv_specs) = params
735     return backend.GetNodeInfo(storage_units, hv_specs)
736
737   @staticmethod
738   def _ConvertExclStorage(storage_units, excl_stor):
739     result_units = []
740     for (storage_type, storage_key) in storage_units:
741       if storage_type in [constants.ST_LVM_VG, constants.ST_LVM_PV]:
742         result_units.append((storage_type, storage_key, [excl_stor]))
743       else:
744         result_units.append((storage_type, storage_key, []))
745     return result_units
746
747   @staticmethod
748   def perspective_etc_hosts_modify(params):
749     """Modify a node entry in /etc/hosts.
750
751     """
752     backend.EtcHostsModify(params[0], params[1], params[2])
753
754     return True
755
756   @staticmethod
757   def perspective_node_verify(params):
758     """Run a verify sequence on this node.
759
760     """
761     (what, cluster_name, hvparams) = params
762     return backend.VerifyNode(what, cluster_name, hvparams)
763
764   @classmethod
765   def perspective_node_verify_light(cls, params):
766     """Run a light verify sequence on this node.
767
768     """
769     # So far it's the same as the normal node_verify
770     return cls.perspective_node_verify(params)
771
772   @staticmethod
773   def perspective_node_start_master_daemons(params):
774     """Start the master daemons on this node.
775
776     """
777     return backend.StartMasterDaemons(params[0])
778
779   @staticmethod
780   def perspective_node_activate_master_ip(params):
781     """Activate the master IP on this node.
782
783     """
784     master_params = objects.MasterNetworkParameters.FromDict(params[0])
785     return backend.ActivateMasterIp(master_params, params[1])
786
787   @staticmethod
788   def perspective_node_deactivate_master_ip(params):
789     """Deactivate the master IP on this node.
790
791     """
792     master_params = objects.MasterNetworkParameters.FromDict(params[0])
793     return backend.DeactivateMasterIp(master_params, params[1])
794
795   @staticmethod
796   def perspective_node_stop_master(params):
797     """Stops master daemons on this node.
798
799     """
800     return backend.StopMasterDaemons()
801
802   @staticmethod
803   def perspective_node_change_master_netmask(params):
804     """Change the master IP netmask.
805
806     """
807     return backend.ChangeMasterNetmask(params[0], params[1], params[2],
808                                        params[3])
809
810   @staticmethod
811   def perspective_node_leave_cluster(params):
812     """Cleanup after leaving a cluster.
813
814     """
815     return backend.LeaveCluster(params[0])
816
817   @staticmethod
818   def perspective_node_volumes(params):
819     """Query the list of all logical volume groups.
820
821     """
822     return backend.NodeVolumes()
823
824   @staticmethod
825   def perspective_node_demote_from_mc(params):
826     """Demote a node from the master candidate role.
827
828     """
829     return backend.DemoteFromMC()
830
831   @staticmethod
832   def perspective_node_powercycle(params):
833     """Tries to powercycle the nod.
834
835     """
836     (hypervisor_type, hvparams) = params
837     return backend.PowercycleNode(hypervisor_type, hvparams)
838
839   # cluster --------------------------
840
841   @staticmethod
842   def perspective_version(params):
843     """Query version information.
844
845     """
846     return constants.PROTOCOL_VERSION
847
848   @staticmethod
849   def perspective_upload_file(params):
850     """Upload a file.
851
852     Note that the backend implementation imposes strict rules on which
853     files are accepted.
854
855     """
856     return backend.UploadFile(*(params[0]))
857
858   @staticmethod
859   def perspective_master_info(params):
860     """Query master information.
861
862     """
863     return backend.GetMasterInfo()
864
865   @staticmethod
866   def perspective_run_oob(params):
867     """Runs oob on node.
868
869     """
870     output = backend.RunOob(params[0], params[1], params[2], params[3])
871     if output:
872       result = serializer.LoadJson(output)
873     else:
874       result = None
875     return result
876
877   @staticmethod
878   def perspective_restricted_command(params):
879     """Runs a restricted command.
880
881     """
882     (cmd, ) = params
883
884     return backend.RunRestrictedCmd(cmd)
885
886   @staticmethod
887   def perspective_write_ssconf_files(params):
888     """Write ssconf files.
889
890     """
891     (values,) = params
892     return ssconf.WriteSsconfFiles(values)
893
894   @staticmethod
895   def perspective_get_watcher_pause(params):
896     """Get watcher pause end.
897
898     """
899     return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
900
901   @staticmethod
902   def perspective_set_watcher_pause(params):
903     """Set watcher pause.
904
905     """
906     (until, ) = params
907     return backend.SetWatcherPause(until)
908
909   # os -----------------------
910
911   @staticmethod
912   def perspective_os_diagnose(params):
913     """Query detailed information about existing OSes.
914
915     """
916     return backend.DiagnoseOS()
917
918   @staticmethod
919   def perspective_os_get(params):
920     """Query information about a given OS.
921
922     """
923     name = params[0]
924     os_obj = backend.OSFromDisk(name)
925     return os_obj.ToDict()
926
927   @staticmethod
928   def perspective_os_validate(params):
929     """Run a given OS' validation routine.
930
931     """
932     required, name, checks, params = params
933     return backend.ValidateOS(required, name, checks, params)
934
935   # extstorage -----------------------
936
937   @staticmethod
938   def perspective_extstorage_diagnose(params):
939     """Query detailed information about existing extstorage providers.
940
941     """
942     return backend.DiagnoseExtStorage()
943
944   # hooks -----------------------
945
946   @staticmethod
947   def perspective_hooks_runner(params):
948     """Run hook scripts.
949
950     """
951     hpath, phase, env = params
952     hr = backend.HooksRunner()
953     return hr.RunHooks(hpath, phase, env)
954
955   # iallocator -----------------
956
957   @staticmethod
958   def perspective_iallocator_runner(params):
959     """Run an iallocator script.
960
961     """
962     name, idata = params
963     iar = backend.IAllocatorRunner()
964     return iar.Run(name, idata)
965
966   # test -----------------------
967
968   @staticmethod
969   def perspective_test_delay(params):
970     """Run test delay.
971
972     """
973     duration = params[0]
974     status, rval = utils.TestDelay(duration)
975     if not status:
976       raise backend.RPCFail(rval)
977     return rval
978
979   # file storage ---------------
980
981   @staticmethod
982   def perspective_file_storage_dir_create(params):
983     """Create the file storage directory.
984
985     """
986     file_storage_dir = params[0]
987     return backend.CreateFileStorageDir(file_storage_dir)
988
989   @staticmethod
990   def perspective_file_storage_dir_remove(params):
991     """Remove the file storage directory.
992
993     """
994     file_storage_dir = params[0]
995     return backend.RemoveFileStorageDir(file_storage_dir)
996
997   @staticmethod
998   def perspective_file_storage_dir_rename(params):
999     """Rename the file storage directory.
1000
1001     """
1002     old_file_storage_dir = params[0]
1003     new_file_storage_dir = params[1]
1004     return backend.RenameFileStorageDir(old_file_storage_dir,
1005                                         new_file_storage_dir)
1006
1007   # jobs ------------------------
1008
1009   @staticmethod
1010   @_RequireJobQueueLock
1011   def perspective_jobqueue_update(params):
1012     """Update job queue.
1013
1014     """
1015     (file_name, content) = params
1016     return backend.JobQueueUpdate(file_name, content)
1017
1018   @staticmethod
1019   @_RequireJobQueueLock
1020   def perspective_jobqueue_purge(params):
1021     """Purge job queue.
1022
1023     """
1024     return backend.JobQueuePurge()
1025
1026   @staticmethod
1027   @_RequireJobQueueLock
1028   def perspective_jobqueue_rename(params):
1029     """Rename a job queue file.
1030
1031     """
1032     # TODO: What if a file fails to rename?
1033     return [backend.JobQueueRename(old, new) for old, new in params[0]]
1034
1035   @staticmethod
1036   @_RequireJobQueueLock
1037   def perspective_jobqueue_set_drain_flag(params):
1038     """Set job queue's drain flag.
1039
1040     """
1041     (flag, ) = params
1042
1043     return jstore.SetDrainFlag(flag)
1044
1045   # hypervisor ---------------
1046
1047   @staticmethod
1048   def perspective_hypervisor_validate_params(params):
1049     """Validate the hypervisor parameters.
1050
1051     """
1052     (hvname, hvparams) = params
1053     return backend.ValidateHVParams(hvname, hvparams)
1054
1055   # Crypto
1056
1057   @staticmethod
1058   def perspective_x509_cert_create(params):
1059     """Creates a new X509 certificate for SSL/TLS.
1060
1061     """
1062     (validity, ) = params
1063     return backend.CreateX509Certificate(validity)
1064
1065   @staticmethod
1066   def perspective_x509_cert_remove(params):
1067     """Removes a X509 certificate.
1068
1069     """
1070     (name, ) = params
1071     return backend.RemoveX509Certificate(name)
1072
1073   # Import and export
1074
1075   @staticmethod
1076   def perspective_import_start(params):
1077     """Starts an import daemon.
1078
1079     """
1080     (opts_s, instance, component, (dest, dest_args)) = params
1081
1082     opts = objects.ImportExportOptions.FromDict(opts_s)
1083
1084     return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
1085                                            None, None,
1086                                            objects.Instance.FromDict(instance),
1087                                            component, dest,
1088                                            _DecodeImportExportIO(dest,
1089                                                                  dest_args))
1090
1091   @staticmethod
1092   def perspective_export_start(params):
1093     """Starts an export daemon.
1094
1095     """
1096     (opts_s, host, port, instance, component, (source, source_args)) = params
1097
1098     opts = objects.ImportExportOptions.FromDict(opts_s)
1099
1100     return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
1101                                            host, port,
1102                                            objects.Instance.FromDict(instance),
1103                                            component, source,
1104                                            _DecodeImportExportIO(source,
1105                                                                  source_args))
1106
1107   @staticmethod
1108   def perspective_impexp_status(params):
1109     """Retrieves the status of an import or export daemon.
1110
1111     """
1112     return backend.GetImportExportStatus(params[0])
1113
1114   @staticmethod
1115   def perspective_impexp_abort(params):
1116     """Aborts an import or export.
1117
1118     """
1119     return backend.AbortImportExport(params[0])
1120
1121   @staticmethod
1122   def perspective_impexp_cleanup(params):
1123     """Cleans up after an import or export.
1124
1125     """
1126     return backend.CleanupImportExport(params[0])
1127
1128
1129 def CheckNoded(_, args):
1130   """Initial checks whether to run or exit with a failure.
1131
1132   """
1133   if args: # noded doesn't take any arguments
1134     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1135                           sys.argv[0])
1136     sys.exit(constants.EXIT_FAILURE)
1137   try:
1138     codecs.lookup("string-escape")
1139   except LookupError:
1140     print >> sys.stderr, ("Can't load the string-escape code which is part"
1141                           " of the Python installation. Is your installation"
1142                           " complete/correct? Aborting.")
1143     sys.exit(constants.EXIT_FAILURE)
1144
1145
1146 def PrepNoded(options, _):
1147   """Preparation node daemon function, executed with the PID file held.
1148
1149   """
1150   if options.mlock:
1151     request_executor_class = MlockallRequestExecutor
1152     try:
1153       utils.Mlockall()
1154     except errors.NoCtypesError:
1155       logging.warning("Cannot set memory lock, ctypes module not found")
1156       request_executor_class = http.server.HttpServerRequestExecutor
1157   else:
1158     request_executor_class = http.server.HttpServerRequestExecutor
1159
1160   # Read SSL certificate
1161   if options.ssl:
1162     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1163                                     ssl_cert_path=options.ssl_cert)
1164   else:
1165     ssl_params = None
1166
1167   err = _PrepareQueueLock()
1168   if err is not None:
1169     # this might be some kind of file-system/permission error; while
1170     # this breaks the job queue functionality, we shouldn't prevent
1171     # startup of the whole node daemon because of this
1172     logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1173
1174   handler = NodeRequestHandler()
1175
1176   mainloop = daemon.Mainloop()
1177   server = \
1178     http.server.HttpServer(mainloop, options.bind_address, options.port,
1179                            handler, ssl_params=ssl_params, ssl_verify_peer=True,
1180                            request_executor_class=request_executor_class)
1181   server.Start()
1182
1183   return (mainloop, server)
1184
1185
1186 def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1187   """Main node daemon function, executed with the PID file held.
1188
1189   """
1190   (mainloop, server) = prep_data
1191   try:
1192     mainloop.Run()
1193   finally:
1194     server.Stop()
1195
1196
1197 def Main():
1198   """Main function for the node daemon.
1199
1200   """
1201   parser = OptionParser(description="Ganeti node daemon",
1202                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]\
1203                               \ [-i INTERFACE]",
1204                         version="%%prog (ganeti) %s" %
1205                         constants.RELEASE_VERSION)
1206   parser.add_option("--no-mlock", dest="mlock",
1207                     help="Do not mlock the node memory in ram",
1208                     default=True, action="store_false")
1209
1210   daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1211                      default_ssl_cert=pathutils.NODED_CERT_FILE,
1212                      default_ssl_key=pathutils.NODED_CERT_FILE,
1213                      console_logging=True)