Limit the PVs used for disk growth with exclusive storage
[ganeti-local] / lib / server / noded.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36 import codecs
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti.storage import container
49 from ganeti import serializer
50 from ganeti import netutils
51 from ganeti import pathutils
52 from ganeti import ssconf
53
54 import ganeti.http.server # pylint: disable=W0611
55
56
57 queue_lock = None
58
59
60 def _extendReasonTrail(trail, source, reason=""):
61   """Extend the reason trail with noded information
62
63   The trail is extended by appending the name of the noded functionality
64   """
65   assert trail is not None
66   trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source)
67   trail.append((trail_source, reason, utils.EpochNano()))
68
69
70 def _PrepareQueueLock():
71   """Try to prepare the queue lock.
72
73   @return: None for success, otherwise an exception object
74
75   """
76   global queue_lock # pylint: disable=W0603
77
78   if queue_lock is not None:
79     return None
80
81   # Prepare job queue
82   try:
83     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
84     return None
85   except EnvironmentError, err:
86     return err
87
88
89 def _RequireJobQueueLock(fn):
90   """Decorator for job queue manipulating functions.
91
92   """
93   QUEUE_LOCK_TIMEOUT = 10
94
95   def wrapper(*args, **kwargs):
96     # Locking in exclusive, blocking mode because there could be several
97     # children running at the same time. Waiting up to 10 seconds.
98     if _PrepareQueueLock() is not None:
99       raise errors.JobQueueError("Job queue failed initialization,"
100                                  " cannot update jobs")
101     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
102     try:
103       return fn(*args, **kwargs)
104     finally:
105       queue_lock.Unlock()
106
107   return wrapper
108
109
110 def _DecodeImportExportIO(ieio, ieioargs):
111   """Decodes import/export I/O information.
112
113   """
114   if ieio == constants.IEIO_RAW_DISK:
115     assert len(ieioargs) == 1
116     return (objects.Disk.FromDict(ieioargs[0]), )
117
118   if ieio == constants.IEIO_SCRIPT:
119     assert len(ieioargs) == 2
120     return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
121
122   return ieioargs
123
124
125 def _DefaultAlternative(value, default):
126   """Returns value or, if evaluating to False, a default value.
127
128   Returns the given value, unless it evaluates to False. In the latter case the
129   default value is returned.
130
131   @param value: Value to return if it doesn't evaluate to False
132   @param default: Default value
133   @return: Given value or the default
134
135   """
136   if value:
137     return value
138
139   return default
140
141
142 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
143   """Subclass ensuring request handlers are locked in RAM.
144
145   """
146   def __init__(self, *args, **kwargs):
147     utils.Mlockall()
148
149     http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
150
151
152 class NodeRequestHandler(http.server.HttpServerHandler):
153   """The server implementation.
154
155   This class holds all methods exposed over the RPC interface.
156
157   """
158   # too many public methods, and unused args - all methods get params
159   # due to the API
160   # pylint: disable=R0904,W0613
161   def __init__(self):
162     http.server.HttpServerHandler.__init__(self)
163     self.noded_pid = os.getpid()
164
165   def HandleRequest(self, req):
166     """Handle a request.
167
168     """
169     if req.request_method.upper() != http.HTTP_POST:
170       raise http.HttpBadRequest("Only the POST method is supported")
171
172     path = req.request_path
173     if path.startswith("/"):
174       path = path[1:]
175
176     method = getattr(self, "perspective_%s" % path, None)
177     if method is None:
178       raise http.HttpNotFound()
179
180     try:
181       result = (True, method(serializer.LoadJson(req.request_body)))
182
183     except backend.RPCFail, err:
184       # our custom failure exception; str(err) works fine if the
185       # exception was constructed with a single argument, and in
186       # this case, err.message == err.args[0] == str(err)
187       result = (False, str(err))
188     except errors.QuitGanetiException, err:
189       # Tell parent to quit
190       logging.info("Shutting down the node daemon, arguments: %s",
191                    str(err.args))
192       os.kill(self.noded_pid, signal.SIGTERM)
193       # And return the error's arguments, which must be already in
194       # correct tuple format
195       result = err.args
196     except Exception, err:
197       logging.exception("Error in RPC call")
198       result = (False, "Error while executing backend function: %s" % str(err))
199
200     return serializer.DumpJson(result)
201
202   # the new block devices  --------------------------
203
204   @staticmethod
205   def perspective_blockdev_create(params):
206     """Create a block device.
207
208     """
209     (bdev_s, size, owner, on_primary, info, excl_stor) = params
210     bdev = objects.Disk.FromDict(bdev_s)
211     if bdev is None:
212       raise ValueError("can't unserialize data!")
213     return backend.BlockdevCreate(bdev, size, owner, on_primary, info,
214                                   excl_stor)
215
216   @staticmethod
217   def perspective_blockdev_pause_resume_sync(params):
218     """Pause/resume sync of a block device.
219
220     """
221     disks_s, pause = params
222     disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
223     return backend.BlockdevPauseResumeSync(disks, pause)
224
225   @staticmethod
226   def perspective_blockdev_wipe(params):
227     """Wipe a block device.
228
229     """
230     bdev_s, offset, size = params
231     bdev = objects.Disk.FromDict(bdev_s)
232     return backend.BlockdevWipe(bdev, offset, size)
233
234   @staticmethod
235   def perspective_blockdev_remove(params):
236     """Remove a block device.
237
238     """
239     bdev_s = params[0]
240     bdev = objects.Disk.FromDict(bdev_s)
241     return backend.BlockdevRemove(bdev)
242
243   @staticmethod
244   def perspective_blockdev_rename(params):
245     """Remove a block device.
246
247     """
248     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]]
249     return backend.BlockdevRename(devlist)
250
251   @staticmethod
252   def perspective_blockdev_assemble(params):
253     """Assemble a block device.
254
255     """
256     bdev_s, owner, on_primary, idx = params
257     bdev = objects.Disk.FromDict(bdev_s)
258     if bdev is None:
259       raise ValueError("can't unserialize data!")
260     return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
261
262   @staticmethod
263   def perspective_blockdev_shutdown(params):
264     """Shutdown a block device.
265
266     """
267     bdev_s = params[0]
268     bdev = objects.Disk.FromDict(bdev_s)
269     if bdev is None:
270       raise ValueError("can't unserialize data!")
271     return backend.BlockdevShutdown(bdev)
272
273   @staticmethod
274   def perspective_blockdev_addchildren(params):
275     """Add a child to a mirror device.
276
277     Note: this is only valid for mirror devices. It's the caller's duty
278     to send a correct disk, otherwise we raise an error.
279
280     """
281     bdev_s, ndev_s = params
282     bdev = objects.Disk.FromDict(bdev_s)
283     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
284     if bdev is None or ndevs.count(None) > 0:
285       raise ValueError("can't unserialize data!")
286     return backend.BlockdevAddchildren(bdev, ndevs)
287
288   @staticmethod
289   def perspective_blockdev_removechildren(params):
290     """Remove a child from a mirror device.
291
292     This is only valid for mirror devices, of course. It's the callers
293     duty to send a correct disk, otherwise we raise an error.
294
295     """
296     bdev_s, ndev_s = params
297     bdev = objects.Disk.FromDict(bdev_s)
298     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
299     if bdev is None or ndevs.count(None) > 0:
300       raise ValueError("can't unserialize data!")
301     return backend.BlockdevRemovechildren(bdev, ndevs)
302
303   @staticmethod
304   def perspective_blockdev_getmirrorstatus(params):
305     """Return the mirror status for a list of disks.
306
307     """
308     disks = [objects.Disk.FromDict(dsk_s)
309              for dsk_s in params[0]]
310     return [status.ToDict()
311             for status in backend.BlockdevGetmirrorstatus(disks)]
312
313   @staticmethod
314   def perspective_blockdev_getmirrorstatus_multi(params):
315     """Return the mirror status for a list of disks.
316
317     """
318     (node_disks, ) = params
319
320     disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
321
322     result = []
323
324     for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
325       if success:
326         result.append((success, status.ToDict()))
327       else:
328         result.append((success, status))
329
330     return result
331
332   @staticmethod
333   def perspective_blockdev_find(params):
334     """Expose the FindBlockDevice functionality for a disk.
335
336     This will try to find but not activate a disk.
337
338     """
339     disk = objects.Disk.FromDict(params[0])
340
341     result = backend.BlockdevFind(disk)
342     if result is None:
343       return None
344
345     return result.ToDict()
346
347   @staticmethod
348   def perspective_blockdev_snapshot(params):
349     """Create a snapshot device.
350
351     Note that this is only valid for LVM disks, if we get passed
352     something else we raise an exception. The snapshot device can be
353     remove by calling the generic block device remove call.
354
355     """
356     cfbd = objects.Disk.FromDict(params[0])
357     return backend.BlockdevSnapshot(cfbd)
358
359   @staticmethod
360   def perspective_blockdev_grow(params):
361     """Grow a stack of devices.
362
363     """
364     if len(params) < 5:
365       raise ValueError("Received only %s parameters in blockdev_grow,"
366                        " old master?" % len(params))
367     cfbd = objects.Disk.FromDict(params[0])
368     amount = params[1]
369     dryrun = params[2]
370     backingstore = params[3]
371     excl_stor = params[4]
372     return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore, excl_stor)
373
374   @staticmethod
375   def perspective_blockdev_close(params):
376     """Closes the given block devices.
377
378     """
379     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
380     return backend.BlockdevClose(params[0], disks)
381
382   @staticmethod
383   def perspective_blockdev_getdimensions(params):
384     """Compute the sizes of the given block devices.
385
386     """
387     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
388     return backend.BlockdevGetdimensions(disks)
389
390   @staticmethod
391   def perspective_blockdev_export(params):
392     """Compute the sizes of the given block devices.
393
394     """
395     disk = objects.Disk.FromDict(params[0])
396     dest_node, dest_path, cluster_name = params[1:]
397     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
398
399   @staticmethod
400   def perspective_blockdev_setinfo(params):
401     """Sets metadata information on the given block device.
402
403     """
404     (disk, info) = params
405     disk = objects.Disk.FromDict(disk)
406     return backend.BlockdevSetInfo(disk, info)
407
408   # blockdev/drbd specific methods ----------
409
410   @staticmethod
411   def perspective_drbd_disconnect_net(params):
412     """Disconnects the network connection of drbd disks.
413
414     Note that this is only valid for drbd disks, so the members of the
415     disk list must all be drbd devices.
416
417     """
418     nodes_ip, disks, target_node_uuid = params
419     disks = [objects.Disk.FromDict(cf) for cf in disks]
420     return backend.DrbdDisconnectNet(target_node_uuid, nodes_ip, disks)
421
422   @staticmethod
423   def perspective_drbd_attach_net(params):
424     """Attaches the network connection of drbd disks.
425
426     Note that this is only valid for drbd disks, so the members of the
427     disk list must all be drbd devices.
428
429     """
430     nodes_ip, disks, instance_name, multimaster, target_node_uuid = params
431     disks = [objects.Disk.FromDict(cf) for cf in disks]
432     return backend.DrbdAttachNet(target_node_uuid, nodes_ip, disks,
433                                  instance_name, multimaster)
434
435   @staticmethod
436   def perspective_drbd_wait_sync(params):
437     """Wait until DRBD disks are synched.
438
439     Note that this is only valid for drbd disks, so the members of the
440     disk list must all be drbd devices.
441
442     """
443     nodes_ip, disks, target_node_uuid = params
444     disks = [objects.Disk.FromDict(cf) for cf in disks]
445     return backend.DrbdWaitSync(target_node_uuid, nodes_ip, disks)
446
447   @staticmethod
448   def perspective_drbd_helper(params):
449     """Query drbd helper.
450
451     """
452     return backend.GetDrbdUsermodeHelper()
453
454   # export/import  --------------------------
455
456   @staticmethod
457   def perspective_finalize_export(params):
458     """Expose the finalize export functionality.
459
460     """
461     instance = objects.Instance.FromDict(params[0])
462
463     snap_disks = []
464     for disk in params[1]:
465       if isinstance(disk, bool):
466         snap_disks.append(disk)
467       else:
468         snap_disks.append(objects.Disk.FromDict(disk))
469
470     return backend.FinalizeExport(instance, snap_disks)
471
472   @staticmethod
473   def perspective_export_info(params):
474     """Query information about an existing export on this node.
475
476     The given path may not contain an export, in which case we return
477     None.
478
479     """
480     path = params[0]
481     return backend.ExportInfo(path)
482
483   @staticmethod
484   def perspective_export_list(params):
485     """List the available exports on this node.
486
487     Note that as opposed to export_info, which may query data about an
488     export in any path, this only queries the standard Ganeti path
489     (pathutils.EXPORT_DIR).
490
491     """
492     return backend.ListExports()
493
494   @staticmethod
495   def perspective_export_remove(params):
496     """Remove an export.
497
498     """
499     export = params[0]
500     return backend.RemoveExport(export)
501
502   # block device ---------------------
503   @staticmethod
504   def perspective_bdev_sizes(params):
505     """Query the list of block devices
506
507     """
508     devices = params[0]
509     return backend.GetBlockDevSizes(devices)
510
511   # volume  --------------------------
512
513   @staticmethod
514   def perspective_lv_list(params):
515     """Query the list of logical volumes in a given volume group.
516
517     """
518     vgname = params[0]
519     return backend.GetVolumeList(vgname)
520
521   @staticmethod
522   def perspective_vg_list(params):
523     """Query the list of volume groups.
524
525     """
526     return backend.ListVolumeGroups()
527
528   # Storage --------------------------
529
530   @staticmethod
531   def perspective_storage_list(params):
532     """Get list of storage units.
533
534     """
535     (su_name, su_args, name, fields) = params
536     return container.GetStorage(su_name, *su_args).List(name, fields)
537
538   @staticmethod
539   def perspective_storage_modify(params):
540     """Modify a storage unit.
541
542     """
543     (su_name, su_args, name, changes) = params
544     return container.GetStorage(su_name, *su_args).Modify(name, changes)
545
546   @staticmethod
547   def perspective_storage_execute(params):
548     """Execute an operation on a storage unit.
549
550     """
551     (su_name, su_args, name, op) = params
552     return container.GetStorage(su_name, *su_args).Execute(name, op)
553
554   # bridge  --------------------------
555
556   @staticmethod
557   def perspective_bridges_exist(params):
558     """Check if all bridges given exist on this node.
559
560     """
561     bridges_list = params[0]
562     return backend.BridgesExist(bridges_list)
563
564   # instance  --------------------------
565
566   @staticmethod
567   def perspective_instance_os_add(params):
568     """Install an OS on a given instance.
569
570     """
571     inst_s = params[0]
572     inst = objects.Instance.FromDict(inst_s)
573     reinstall = params[1]
574     debug = params[2]
575     return backend.InstanceOsAdd(inst, reinstall, debug)
576
577   @staticmethod
578   def perspective_instance_run_rename(params):
579     """Runs the OS rename script for an instance.
580
581     """
582     inst_s, old_name, debug = params
583     inst = objects.Instance.FromDict(inst_s)
584     return backend.RunRenameInstance(inst, old_name, debug)
585
586   @staticmethod
587   def perspective_instance_shutdown(params):
588     """Shutdown an instance.
589
590     """
591     instance = objects.Instance.FromDict(params[0])
592     timeout = params[1]
593     trail = params[2]
594     _extendReasonTrail(trail, "shutdown")
595     return backend.InstanceShutdown(instance, timeout, trail)
596
597   @staticmethod
598   def perspective_instance_start(params):
599     """Start an instance.
600
601     """
602     (instance_name, startup_paused, trail) = params
603     instance = objects.Instance.FromDict(instance_name)
604     _extendReasonTrail(trail, "start")
605     return backend.StartInstance(instance, startup_paused, trail)
606
607   @staticmethod
608   def perspective_migration_info(params):
609     """Gather information about an instance to be migrated.
610
611     """
612     instance = objects.Instance.FromDict(params[0])
613     return backend.MigrationInfo(instance)
614
615   @staticmethod
616   def perspective_accept_instance(params):
617     """Prepare the node to accept an instance.
618
619     """
620     instance, info, target = params
621     instance = objects.Instance.FromDict(instance)
622     return backend.AcceptInstance(instance, info, target)
623
624   @staticmethod
625   def perspective_instance_finalize_migration_dst(params):
626     """Finalize the instance migration on the destination node.
627
628     """
629     instance, info, success = params
630     instance = objects.Instance.FromDict(instance)
631     return backend.FinalizeMigrationDst(instance, info, success)
632
633   @staticmethod
634   def perspective_instance_migrate(params):
635     """Migrates an instance.
636
637     """
638     cluster_name, instance, target, live = params
639     instance = objects.Instance.FromDict(instance)
640     return backend.MigrateInstance(cluster_name, instance, target, live)
641
642   @staticmethod
643   def perspective_instance_finalize_migration_src(params):
644     """Finalize the instance migration on the source node.
645
646     """
647     instance, success, live = params
648     instance = objects.Instance.FromDict(instance)
649     return backend.FinalizeMigrationSource(instance, success, live)
650
651   @staticmethod
652   def perspective_instance_get_migration_status(params):
653     """Reports migration status.
654
655     """
656     instance = objects.Instance.FromDict(params[0])
657     return backend.GetMigrationStatus(instance).ToDict()
658
659   @staticmethod
660   def perspective_instance_reboot(params):
661     """Reboot an instance.
662
663     """
664     instance = objects.Instance.FromDict(params[0])
665     reboot_type = params[1]
666     shutdown_timeout = params[2]
667     trail = params[3]
668     _extendReasonTrail(trail, "reboot")
669     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout,
670                                   trail)
671
672   @staticmethod
673   def perspective_instance_balloon_memory(params):
674     """Modify instance runtime memory.
675
676     """
677     instance_dict, memory = params
678     instance = objects.Instance.FromDict(instance_dict)
679     return backend.InstanceBalloonMemory(instance, memory)
680
681   @staticmethod
682   def perspective_instance_info(params):
683     """Query instance information.
684
685     """
686     (instance_name, hypervisor_name, hvparams) = params
687     return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
688
689   @staticmethod
690   def perspective_instance_migratable(params):
691     """Query whether the specified instance can be migrated.
692
693     """
694     instance = objects.Instance.FromDict(params[0])
695     return backend.GetInstanceMigratable(instance)
696
697   @staticmethod
698   def perspective_all_instances_info(params):
699     """Query information about all instances.
700
701     """
702     (hypervisor_list, all_hvparams) = params
703     return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
704
705   @staticmethod
706   def perspective_instance_list(params):
707     """Query the list of running instances.
708
709     """
710     (hypervisor_list, hvparams) = params
711     return backend.GetInstanceList(hypervisor_list, hvparams)
712
713   # node --------------------------
714
715   @staticmethod
716   def perspective_node_has_ip_address(params):
717     """Checks if a node has the given ip address.
718
719     """
720     return netutils.IPAddress.Own(params[0])
721
722   @staticmethod
723   def perspective_node_info(params):
724     """Query node information.
725
726     """
727     (storage_units, hv_specs, excl_stor) = params
728     return backend.GetNodeInfo(storage_units, hv_specs, excl_stor)
729
730   @staticmethod
731   def perspective_etc_hosts_modify(params):
732     """Modify a node entry in /etc/hosts.
733
734     """
735     backend.EtcHostsModify(params[0], params[1], params[2])
736
737     return True
738
739   @staticmethod
740   def perspective_node_verify(params):
741     """Run a verify sequence on this node.
742
743     """
744     (what, cluster_name, hvparams) = params
745     return backend.VerifyNode(what, cluster_name, hvparams)
746
747   @classmethod
748   def perspective_node_verify_light(cls, params):
749     """Run a light verify sequence on this node.
750
751     """
752     # So far it's the same as the normal node_verify
753     return cls.perspective_node_verify(params)
754
755   @staticmethod
756   def perspective_node_start_master_daemons(params):
757     """Start the master daemons on this node.
758
759     """
760     return backend.StartMasterDaemons(params[0])
761
762   @staticmethod
763   def perspective_node_activate_master_ip(params):
764     """Activate the master IP on this node.
765
766     """
767     master_params = objects.MasterNetworkParameters.FromDict(params[0])
768     return backend.ActivateMasterIp(master_params, params[1])
769
770   @staticmethod
771   def perspective_node_deactivate_master_ip(params):
772     """Deactivate the master IP on this node.
773
774     """
775     master_params = objects.MasterNetworkParameters.FromDict(params[0])
776     return backend.DeactivateMasterIp(master_params, params[1])
777
778   @staticmethod
779   def perspective_node_stop_master(params):
780     """Stops master daemons on this node.
781
782     """
783     return backend.StopMasterDaemons()
784
785   @staticmethod
786   def perspective_node_change_master_netmask(params):
787     """Change the master IP netmask.
788
789     """
790     return backend.ChangeMasterNetmask(params[0], params[1], params[2],
791                                        params[3])
792
793   @staticmethod
794   def perspective_node_leave_cluster(params):
795     """Cleanup after leaving a cluster.
796
797     """
798     return backend.LeaveCluster(params[0])
799
800   @staticmethod
801   def perspective_node_volumes(params):
802     """Query the list of all logical volume groups.
803
804     """
805     return backend.NodeVolumes()
806
807   @staticmethod
808   def perspective_node_demote_from_mc(params):
809     """Demote a node from the master candidate role.
810
811     """
812     return backend.DemoteFromMC()
813
814   @staticmethod
815   def perspective_node_powercycle(params):
816     """Tries to powercycle the nod.
817
818     """
819     (hypervisor_type, hvparams) = params
820     return backend.PowercycleNode(hypervisor_type, hvparams)
821
822   # cluster --------------------------
823
824   @staticmethod
825   def perspective_version(params):
826     """Query version information.
827
828     """
829     return constants.PROTOCOL_VERSION
830
831   @staticmethod
832   def perspective_upload_file(params):
833     """Upload a file.
834
835     Note that the backend implementation imposes strict rules on which
836     files are accepted.
837
838     """
839     return backend.UploadFile(*(params[0]))
840
841   @staticmethod
842   def perspective_master_info(params):
843     """Query master information.
844
845     """
846     return backend.GetMasterInfo()
847
848   @staticmethod
849   def perspective_run_oob(params):
850     """Runs oob on node.
851
852     """
853     output = backend.RunOob(params[0], params[1], params[2], params[3])
854     if output:
855       result = serializer.LoadJson(output)
856     else:
857       result = None
858     return result
859
860   @staticmethod
861   def perspective_restricted_command(params):
862     """Runs a restricted command.
863
864     """
865     (cmd, ) = params
866
867     return backend.RunRestrictedCmd(cmd)
868
869   @staticmethod
870   def perspective_write_ssconf_files(params):
871     """Write ssconf files.
872
873     """
874     (values,) = params
875     return ssconf.WriteSsconfFiles(values)
876
877   @staticmethod
878   def perspective_get_watcher_pause(params):
879     """Get watcher pause end.
880
881     """
882     return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
883
884   @staticmethod
885   def perspective_set_watcher_pause(params):
886     """Set watcher pause.
887
888     """
889     (until, ) = params
890     return backend.SetWatcherPause(until)
891
892   # os -----------------------
893
894   @staticmethod
895   def perspective_os_diagnose(params):
896     """Query detailed information about existing OSes.
897
898     """
899     return backend.DiagnoseOS()
900
901   @staticmethod
902   def perspective_os_get(params):
903     """Query information about a given OS.
904
905     """
906     name = params[0]
907     os_obj = backend.OSFromDisk(name)
908     return os_obj.ToDict()
909
910   @staticmethod
911   def perspective_os_validate(params):
912     """Run a given OS' validation routine.
913
914     """
915     required, name, checks, params = params
916     return backend.ValidateOS(required, name, checks, params)
917
918   # extstorage -----------------------
919
920   @staticmethod
921   def perspective_extstorage_diagnose(params):
922     """Query detailed information about existing extstorage providers.
923
924     """
925     return backend.DiagnoseExtStorage()
926
927   # hooks -----------------------
928
929   @staticmethod
930   def perspective_hooks_runner(params):
931     """Run hook scripts.
932
933     """
934     hpath, phase, env = params
935     hr = backend.HooksRunner()
936     return hr.RunHooks(hpath, phase, env)
937
938   # iallocator -----------------
939
940   @staticmethod
941   def perspective_iallocator_runner(params):
942     """Run an iallocator script.
943
944     """
945     name, idata = params
946     iar = backend.IAllocatorRunner()
947     return iar.Run(name, idata)
948
949   # test -----------------------
950
951   @staticmethod
952   def perspective_test_delay(params):
953     """Run test delay.
954
955     """
956     duration = params[0]
957     status, rval = utils.TestDelay(duration)
958     if not status:
959       raise backend.RPCFail(rval)
960     return rval
961
962   # file storage ---------------
963
964   @staticmethod
965   def perspective_file_storage_dir_create(params):
966     """Create the file storage directory.
967
968     """
969     file_storage_dir = params[0]
970     return backend.CreateFileStorageDir(file_storage_dir)
971
972   @staticmethod
973   def perspective_file_storage_dir_remove(params):
974     """Remove the file storage directory.
975
976     """
977     file_storage_dir = params[0]
978     return backend.RemoveFileStorageDir(file_storage_dir)
979
980   @staticmethod
981   def perspective_file_storage_dir_rename(params):
982     """Rename the file storage directory.
983
984     """
985     old_file_storage_dir = params[0]
986     new_file_storage_dir = params[1]
987     return backend.RenameFileStorageDir(old_file_storage_dir,
988                                         new_file_storage_dir)
989
990   # jobs ------------------------
991
992   @staticmethod
993   @_RequireJobQueueLock
994   def perspective_jobqueue_update(params):
995     """Update job queue.
996
997     """
998     (file_name, content) = params
999     return backend.JobQueueUpdate(file_name, content)
1000
1001   @staticmethod
1002   @_RequireJobQueueLock
1003   def perspective_jobqueue_purge(params):
1004     """Purge job queue.
1005
1006     """
1007     return backend.JobQueuePurge()
1008
1009   @staticmethod
1010   @_RequireJobQueueLock
1011   def perspective_jobqueue_rename(params):
1012     """Rename a job queue file.
1013
1014     """
1015     # TODO: What if a file fails to rename?
1016     return [backend.JobQueueRename(old, new) for old, new in params[0]]
1017
1018   @staticmethod
1019   @_RequireJobQueueLock
1020   def perspective_jobqueue_set_drain_flag(params):
1021     """Set job queue's drain flag.
1022
1023     """
1024     (flag, ) = params
1025
1026     return jstore.SetDrainFlag(flag)
1027
1028   # hypervisor ---------------
1029
1030   @staticmethod
1031   def perspective_hypervisor_validate_params(params):
1032     """Validate the hypervisor parameters.
1033
1034     """
1035     (hvname, hvparams) = params
1036     return backend.ValidateHVParams(hvname, hvparams)
1037
1038   # Crypto
1039
1040   @staticmethod
1041   def perspective_x509_cert_create(params):
1042     """Creates a new X509 certificate for SSL/TLS.
1043
1044     """
1045     (validity, ) = params
1046     return backend.CreateX509Certificate(validity)
1047
1048   @staticmethod
1049   def perspective_x509_cert_remove(params):
1050     """Removes a X509 certificate.
1051
1052     """
1053     (name, ) = params
1054     return backend.RemoveX509Certificate(name)
1055
1056   # Import and export
1057
1058   @staticmethod
1059   def perspective_import_start(params):
1060     """Starts an import daemon.
1061
1062     """
1063     (opts_s, instance, component, (dest, dest_args)) = params
1064
1065     opts = objects.ImportExportOptions.FromDict(opts_s)
1066
1067     return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
1068                                            None, None,
1069                                            objects.Instance.FromDict(instance),
1070                                            component, dest,
1071                                            _DecodeImportExportIO(dest,
1072                                                                  dest_args))
1073
1074   @staticmethod
1075   def perspective_export_start(params):
1076     """Starts an export daemon.
1077
1078     """
1079     (opts_s, host, port, instance, component, (source, source_args)) = params
1080
1081     opts = objects.ImportExportOptions.FromDict(opts_s)
1082
1083     return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
1084                                            host, port,
1085                                            objects.Instance.FromDict(instance),
1086                                            component, source,
1087                                            _DecodeImportExportIO(source,
1088                                                                  source_args))
1089
1090   @staticmethod
1091   def perspective_impexp_status(params):
1092     """Retrieves the status of an import or export daemon.
1093
1094     """
1095     return backend.GetImportExportStatus(params[0])
1096
1097   @staticmethod
1098   def perspective_impexp_abort(params):
1099     """Aborts an import or export.
1100
1101     """
1102     return backend.AbortImportExport(params[0])
1103
1104   @staticmethod
1105   def perspective_impexp_cleanup(params):
1106     """Cleans up after an import or export.
1107
1108     """
1109     return backend.CleanupImportExport(params[0])
1110
1111
1112 def CheckNoded(_, args):
1113   """Initial checks whether to run or exit with a failure.
1114
1115   """
1116   if args: # noded doesn't take any arguments
1117     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1118                           sys.argv[0])
1119     sys.exit(constants.EXIT_FAILURE)
1120   try:
1121     codecs.lookup("string-escape")
1122   except LookupError:
1123     print >> sys.stderr, ("Can't load the string-escape code which is part"
1124                           " of the Python installation. Is your installation"
1125                           " complete/correct? Aborting.")
1126     sys.exit(constants.EXIT_FAILURE)
1127
1128
1129 def PrepNoded(options, _):
1130   """Preparation node daemon function, executed with the PID file held.
1131
1132   """
1133   if options.mlock:
1134     request_executor_class = MlockallRequestExecutor
1135     try:
1136       utils.Mlockall()
1137     except errors.NoCtypesError:
1138       logging.warning("Cannot set memory lock, ctypes module not found")
1139       request_executor_class = http.server.HttpServerRequestExecutor
1140   else:
1141     request_executor_class = http.server.HttpServerRequestExecutor
1142
1143   # Read SSL certificate
1144   if options.ssl:
1145     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1146                                     ssl_cert_path=options.ssl_cert)
1147   else:
1148     ssl_params = None
1149
1150   err = _PrepareQueueLock()
1151   if err is not None:
1152     # this might be some kind of file-system/permission error; while
1153     # this breaks the job queue functionality, we shouldn't prevent
1154     # startup of the whole node daemon because of this
1155     logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1156
1157   handler = NodeRequestHandler()
1158
1159   mainloop = daemon.Mainloop()
1160   server = \
1161     http.server.HttpServer(mainloop, options.bind_address, options.port,
1162                            handler, ssl_params=ssl_params, ssl_verify_peer=True,
1163                            request_executor_class=request_executor_class)
1164   server.Start()
1165
1166   return (mainloop, server)
1167
1168
1169 def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1170   """Main node daemon function, executed with the PID file held.
1171
1172   """
1173   (mainloop, server) = prep_data
1174   try:
1175     mainloop.Run()
1176   finally:
1177     server.Stop()
1178
1179
1180 def Main():
1181   """Main function for the node daemon.
1182
1183   """
1184   parser = OptionParser(description="Ganeti node daemon",
1185                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]\
1186                               \ [-i INTERFACE]",
1187                         version="%%prog (ganeti) %s" %
1188                         constants.RELEASE_VERSION)
1189   parser.add_option("--no-mlock", dest="mlock",
1190                     help="Do not mlock the node memory in ram",
1191                     default=True, action="store_false")
1192
1193   daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1194                      default_ssl_cert=pathutils.NODED_CERT_FILE,
1195                      default_ssl_key=pathutils.NODED_CERT_FILE,
1196                      console_logging=True)