Add RPC calls to modify storage fields
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import SocketServer
31 import errno
32 import logging
33 import signal
34
35 from optparse import OptionParser
36
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
45 from ganeti import storage
46
47 import ganeti.http.server
48
49
50 queue_lock = None
51
52
53 def _RequireJobQueueLock(fn):
54   """Decorator for job queue manipulating functions.
55
56   """
57   QUEUE_LOCK_TIMEOUT = 10
58
59   def wrapper(*args, **kwargs):
60     # Locking in exclusive, blocking mode because there could be several
61     # children running at the same time. Waiting up to 10 seconds.
62     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
63     try:
64       return fn(*args, **kwargs)
65     finally:
66       queue_lock.Unlock()
67
68   return wrapper
69
70
71 class NodeHttpServer(http.server.HttpServer):
72   """The server implementation.
73
74   This class holds all methods exposed over the RPC interface.
75
76   """
77   def __init__(self, *args, **kwargs):
78     http.server.HttpServer.__init__(self, *args, **kwargs)
79     self.noded_pid = os.getpid()
80
81   def HandleRequest(self, req):
82     """Handle a request.
83
84     """
85     if req.request_method.upper() != http.HTTP_PUT:
86       raise http.HttpBadRequest()
87
88     path = req.request_path
89     if path.startswith("/"):
90       path = path[1:]
91
92     method = getattr(self, "perspective_%s" % path, None)
93     if method is None:
94       raise http.HttpNotFound()
95
96     try:
97       rvalue = method(req.request_body)
98       return True, rvalue
99
100     except backend.RPCFail, err:
101       # our custom failure exception; str(err) works fine if the
102       # exception was constructed with a single argument, and in
103       # this case, err.message == err.args[0] == str(err)
104       return (False, str(err))
105     except errors.QuitGanetiException, err:
106       # Tell parent to quit
107       logging.info("Shutting down the node daemon, arguments: %s",
108                    str(err.args))
109       os.kill(self.noded_pid, signal.SIGTERM)
110       # And return the error's arguments, which must be already in
111       # correct tuple format
112       return err.args
113     except Exception, err:
114       logging.exception("Error in RPC call")
115       return False, "Error while executing backend function: %s" % str(err)
116
117   # the new block devices  --------------------------
118
119   @staticmethod
120   def perspective_blockdev_create(params):
121     """Create a block device.
122
123     """
124     bdev_s, size, owner, on_primary, info = params
125     bdev = objects.Disk.FromDict(bdev_s)
126     if bdev is None:
127       raise ValueError("can't unserialize data!")
128     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
129
130   @staticmethod
131   def perspective_blockdev_remove(params):
132     """Remove a block device.
133
134     """
135     bdev_s = params[0]
136     bdev = objects.Disk.FromDict(bdev_s)
137     return backend.BlockdevRemove(bdev)
138
139   @staticmethod
140   def perspective_blockdev_rename(params):
141     """Remove a block device.
142
143     """
144     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
145     return backend.BlockdevRename(devlist)
146
147   @staticmethod
148   def perspective_blockdev_assemble(params):
149     """Assemble a block device.
150
151     """
152     bdev_s, owner, on_primary = params
153     bdev = objects.Disk.FromDict(bdev_s)
154     if bdev is None:
155       raise ValueError("can't unserialize data!")
156     return backend.BlockdevAssemble(bdev, owner, on_primary)
157
158   @staticmethod
159   def perspective_blockdev_shutdown(params):
160     """Shutdown a block device.
161
162     """
163     bdev_s = params[0]
164     bdev = objects.Disk.FromDict(bdev_s)
165     if bdev is None:
166       raise ValueError("can't unserialize data!")
167     return backend.BlockdevShutdown(bdev)
168
169   @staticmethod
170   def perspective_blockdev_addchildren(params):
171     """Add a child to a mirror device.
172
173     Note: this is only valid for mirror devices. It's the caller's duty
174     to send a correct disk, otherwise we raise an error.
175
176     """
177     bdev_s, ndev_s = params
178     bdev = objects.Disk.FromDict(bdev_s)
179     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
180     if bdev is None or ndevs.count(None) > 0:
181       raise ValueError("can't unserialize data!")
182     return backend.BlockdevAddchildren(bdev, ndevs)
183
184   @staticmethod
185   def perspective_blockdev_removechildren(params):
186     """Remove a child from a mirror device.
187
188     This is only valid for mirror devices, of course. It's the callers
189     duty to send a correct disk, otherwise we raise an error.
190
191     """
192     bdev_s, ndev_s = params
193     bdev = objects.Disk.FromDict(bdev_s)
194     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
195     if bdev is None or ndevs.count(None) > 0:
196       raise ValueError("can't unserialize data!")
197     return backend.BlockdevRemovechildren(bdev, ndevs)
198
199   @staticmethod
200   def perspective_blockdev_getmirrorstatus(params):
201     """Return the mirror status for a list of disks.
202
203     """
204     disks = [objects.Disk.FromDict(dsk_s)
205             for dsk_s in params]
206     return backend.BlockdevGetmirrorstatus(disks)
207
208   @staticmethod
209   def perspective_blockdev_find(params):
210     """Expose the FindBlockDevice functionality for a disk.
211
212     This will try to find but not activate a disk.
213
214     """
215     disk = objects.Disk.FromDict(params[0])
216     return backend.BlockdevFind(disk)
217
218   @staticmethod
219   def perspective_blockdev_snapshot(params):
220     """Create a snapshot device.
221
222     Note that this is only valid for LVM disks, if we get passed
223     something else we raise an exception. The snapshot device can be
224     remove by calling the generic block device remove call.
225
226     """
227     cfbd = objects.Disk.FromDict(params[0])
228     return backend.BlockdevSnapshot(cfbd)
229
230   @staticmethod
231   def perspective_blockdev_grow(params):
232     """Grow a stack of devices.
233
234     """
235     cfbd = objects.Disk.FromDict(params[0])
236     amount = params[1]
237     return backend.BlockdevGrow(cfbd, amount)
238
239   @staticmethod
240   def perspective_blockdev_close(params):
241     """Closes the given block devices.
242
243     """
244     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
245     return backend.BlockdevClose(params[0], disks)
246
247   # blockdev/drbd specific methods ----------
248
249   @staticmethod
250   def perspective_drbd_disconnect_net(params):
251     """Disconnects the network connection of drbd disks.
252
253     Note that this is only valid for drbd disks, so the members of the
254     disk list must all be drbd devices.
255
256     """
257     nodes_ip, disks = params
258     disks = [objects.Disk.FromDict(cf) for cf in disks]
259     return backend.DrbdDisconnectNet(nodes_ip, disks)
260
261   @staticmethod
262   def perspective_drbd_attach_net(params):
263     """Attaches the network connection of drbd disks.
264
265     Note that this is only valid for drbd disks, so the members of the
266     disk list must all be drbd devices.
267
268     """
269     nodes_ip, disks, instance_name, multimaster = params
270     disks = [objects.Disk.FromDict(cf) for cf in disks]
271     return backend.DrbdAttachNet(nodes_ip, disks,
272                                      instance_name, multimaster)
273
274   @staticmethod
275   def perspective_drbd_wait_sync(params):
276     """Wait until DRBD disks are synched.
277
278     Note that this is only valid for drbd disks, so the members of the
279     disk list must all be drbd devices.
280
281     """
282     nodes_ip, disks = params
283     disks = [objects.Disk.FromDict(cf) for cf in disks]
284     return backend.DrbdWaitSync(nodes_ip, disks)
285
286   # export/import  --------------------------
287
288   @staticmethod
289   def perspective_snapshot_export(params):
290     """Export a given snapshot.
291
292     """
293     disk = objects.Disk.FromDict(params[0])
294     dest_node = params[1]
295     instance = objects.Instance.FromDict(params[2])
296     cluster_name = params[3]
297     dev_idx = params[4]
298     return backend.ExportSnapshot(disk, dest_node, instance,
299                                   cluster_name, dev_idx)
300
301   @staticmethod
302   def perspective_finalize_export(params):
303     """Expose the finalize export functionality.
304
305     """
306     instance = objects.Instance.FromDict(params[0])
307     snap_disks = [objects.Disk.FromDict(str_data)
308                   for str_data in params[1]]
309     return backend.FinalizeExport(instance, snap_disks)
310
311   @staticmethod
312   def perspective_export_info(params):
313     """Query information about an existing export on this node.
314
315     The given path may not contain an export, in which case we return
316     None.
317
318     """
319     path = params[0]
320     return backend.ExportInfo(path)
321
322   @staticmethod
323   def perspective_export_list(params):
324     """List the available exports on this node.
325
326     Note that as opposed to export_info, which may query data about an
327     export in any path, this only queries the standard Ganeti path
328     (constants.EXPORT_DIR).
329
330     """
331     return backend.ListExports()
332
333   @staticmethod
334   def perspective_export_remove(params):
335     """Remove an export.
336
337     """
338     export = params[0]
339     return backend.RemoveExport(export)
340
341   # volume  --------------------------
342
343   @staticmethod
344   def perspective_lv_list(params):
345     """Query the list of logical volumes in a given volume group.
346
347     """
348     vgname = params[0]
349     return backend.GetVolumeList(vgname)
350
351   @staticmethod
352   def perspective_vg_list(params):
353     """Query the list of volume groups.
354
355     """
356     return backend.ListVolumeGroups()
357
358   # Storage --------------------------
359
360   @staticmethod
361   def perspective_storage_list(params):
362     """Get list of storage units.
363
364     """
365     (su_name, su_args, name, fields) = params
366     return storage.GetStorage(su_name, *su_args).List(name, fields)
367
368   @staticmethod
369   def perspective_storage_modify(params):
370     """Modify a storage unit.
371
372     """
373     (su_name, su_args, name, changes) = params
374     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
375
376   # bridge  --------------------------
377
378   @staticmethod
379   def perspective_bridges_exist(params):
380     """Check if all bridges given exist on this node.
381
382     """
383     bridges_list = params[0]
384     return backend.BridgesExist(bridges_list)
385
386   # instance  --------------------------
387
388   @staticmethod
389   def perspective_instance_os_add(params):
390     """Install an OS on a given instance.
391
392     """
393     inst_s = params[0]
394     inst = objects.Instance.FromDict(inst_s)
395     reinstall = params[1]
396     return backend.InstanceOsAdd(inst, reinstall)
397
398   @staticmethod
399   def perspective_instance_run_rename(params):
400     """Runs the OS rename script for an instance.
401
402     """
403     inst_s, old_name = params
404     inst = objects.Instance.FromDict(inst_s)
405     return backend.RunRenameInstance(inst, old_name)
406
407   @staticmethod
408   def perspective_instance_os_import(params):
409     """Run the import function of an OS onto a given instance.
410
411     """
412     inst_s, src_node, src_images, cluster_name = params
413     inst = objects.Instance.FromDict(inst_s)
414     return backend.ImportOSIntoInstance(inst, src_node, src_images,
415                                         cluster_name)
416
417   @staticmethod
418   def perspective_instance_shutdown(params):
419     """Shutdown an instance.
420
421     """
422     instance = objects.Instance.FromDict(params[0])
423     return backend.InstanceShutdown(instance)
424
425   @staticmethod
426   def perspective_instance_start(params):
427     """Start an instance.
428
429     """
430     instance = objects.Instance.FromDict(params[0])
431     return backend.StartInstance(instance)
432
433   @staticmethod
434   def perspective_migration_info(params):
435     """Gather information about an instance to be migrated.
436
437     """
438     instance = objects.Instance.FromDict(params[0])
439     return backend.MigrationInfo(instance)
440
441   @staticmethod
442   def perspective_accept_instance(params):
443     """Prepare the node to accept an instance.
444
445     """
446     instance, info, target = params
447     instance = objects.Instance.FromDict(instance)
448     return backend.AcceptInstance(instance, info, target)
449
450   @staticmethod
451   def perspective_finalize_migration(params):
452     """Finalize the instance migration.
453
454     """
455     instance, info, success = params
456     instance = objects.Instance.FromDict(instance)
457     return backend.FinalizeMigration(instance, info, success)
458
459   @staticmethod
460   def perspective_instance_migrate(params):
461     """Migrates an instance.
462
463     """
464     instance, target, live = params
465     instance = objects.Instance.FromDict(instance)
466     return backend.MigrateInstance(instance, target, live)
467
468   @staticmethod
469   def perspective_instance_reboot(params):
470     """Reboot an instance.
471
472     """
473     instance = objects.Instance.FromDict(params[0])
474     reboot_type = params[1]
475     return backend.InstanceReboot(instance, reboot_type)
476
477   @staticmethod
478   def perspective_instance_info(params):
479     """Query instance information.
480
481     """
482     return backend.GetInstanceInfo(params[0], params[1])
483
484   @staticmethod
485   def perspective_instance_migratable(params):
486     """Query whether the specified instance can be migrated.
487
488     """
489     instance = objects.Instance.FromDict(params[0])
490     return backend.GetInstanceMigratable(instance)
491
492   @staticmethod
493   def perspective_all_instances_info(params):
494     """Query information about all instances.
495
496     """
497     return backend.GetAllInstancesInfo(params[0])
498
499   @staticmethod
500   def perspective_instance_list(params):
501     """Query the list of running instances.
502
503     """
504     return backend.GetInstanceList(params[0])
505
506   # node --------------------------
507
508   @staticmethod
509   def perspective_node_tcp_ping(params):
510     """Do a TcpPing on the remote node.
511
512     """
513     return utils.TcpPing(params[1], params[2], timeout=params[3],
514                          live_port_needed=params[4], source=params[0])
515
516   @staticmethod
517   def perspective_node_has_ip_address(params):
518     """Checks if a node has the given ip address.
519
520     """
521     return utils.OwnIpAddress(params[0])
522
523   @staticmethod
524   def perspective_node_info(params):
525     """Query node information.
526
527     """
528     vgname, hypervisor_type = params
529     return backend.GetNodeInfo(vgname, hypervisor_type)
530
531   @staticmethod
532   def perspective_node_add(params):
533     """Complete the registration of this node in the cluster.
534
535     """
536     return backend.AddNode(params[0], params[1], params[2],
537                            params[3], params[4], params[5])
538
539   @staticmethod
540   def perspective_node_verify(params):
541     """Run a verify sequence on this node.
542
543     """
544     return backend.VerifyNode(params[0], params[1])
545
546   @staticmethod
547   def perspective_node_start_master(params):
548     """Promote this node to master status.
549
550     """
551     return backend.StartMaster(params[0], params[1])
552
553   @staticmethod
554   def perspective_node_stop_master(params):
555     """Demote this node from master status.
556
557     """
558     return backend.StopMaster(params[0])
559
560   @staticmethod
561   def perspective_node_leave_cluster(params):
562     """Cleanup after leaving a cluster.
563
564     """
565     return backend.LeaveCluster()
566
567   @staticmethod
568   def perspective_node_volumes(params):
569     """Query the list of all logical volume groups.
570
571     """
572     return backend.NodeVolumes()
573
574   @staticmethod
575   def perspective_node_demote_from_mc(params):
576     """Demote a node from the master candidate role.
577
578     """
579     return backend.DemoteFromMC()
580
581
582   @staticmethod
583   def perspective_node_powercycle(params):
584     """Tries to powercycle the nod.
585
586     """
587     hypervisor_type = params[0]
588     return backend.PowercycleNode(hypervisor_type)
589
590
591   # cluster --------------------------
592
593   @staticmethod
594   def perspective_version(params):
595     """Query version information.
596
597     """
598     return constants.PROTOCOL_VERSION
599
600   @staticmethod
601   def perspective_upload_file(params):
602     """Upload a file.
603
604     Note that the backend implementation imposes strict rules on which
605     files are accepted.
606
607     """
608     return backend.UploadFile(*params)
609
610   @staticmethod
611   def perspective_master_info(params):
612     """Query master information.
613
614     """
615     return backend.GetMasterInfo()
616
617   @staticmethod
618   def perspective_write_ssconf_files(params):
619     """Write ssconf files.
620
621     """
622     (values,) = params
623     return backend.WriteSsconfFiles(values)
624
625   # os -----------------------
626
627   @staticmethod
628   def perspective_os_diagnose(params):
629     """Query detailed information about existing OSes.
630
631     """
632     return backend.DiagnoseOS()
633
634   @staticmethod
635   def perspective_os_get(params):
636     """Query information about a given OS.
637
638     """
639     name = params[0]
640     os_obj = backend.OSFromDisk(name)
641     return os_obj.ToDict()
642
643   # hooks -----------------------
644
645   @staticmethod
646   def perspective_hooks_runner(params):
647     """Run hook scripts.
648
649     """
650     hpath, phase, env = params
651     hr = backend.HooksRunner()
652     return hr.RunHooks(hpath, phase, env)
653
654   # iallocator -----------------
655
656   @staticmethod
657   def perspective_iallocator_runner(params):
658     """Run an iallocator script.
659
660     """
661     name, idata = params
662     iar = backend.IAllocatorRunner()
663     return iar.Run(name, idata)
664
665   # test -----------------------
666
667   @staticmethod
668   def perspective_test_delay(params):
669     """Run test delay.
670
671     """
672     duration = params[0]
673     status, rval = utils.TestDelay(duration)
674     if not status:
675       raise backend.RPCFail(rval)
676     return rval
677
678   # file storage ---------------
679
680   @staticmethod
681   def perspective_file_storage_dir_create(params):
682     """Create the file storage directory.
683
684     """
685     file_storage_dir = params[0]
686     return backend.CreateFileStorageDir(file_storage_dir)
687
688   @staticmethod
689   def perspective_file_storage_dir_remove(params):
690     """Remove the file storage directory.
691
692     """
693     file_storage_dir = params[0]
694     return backend.RemoveFileStorageDir(file_storage_dir)
695
696   @staticmethod
697   def perspective_file_storage_dir_rename(params):
698     """Rename the file storage directory.
699
700     """
701     old_file_storage_dir = params[0]
702     new_file_storage_dir = params[1]
703     return backend.RenameFileStorageDir(old_file_storage_dir,
704                                         new_file_storage_dir)
705
706   # jobs ------------------------
707
708   @staticmethod
709   @_RequireJobQueueLock
710   def perspective_jobqueue_update(params):
711     """Update job queue.
712
713     """
714     (file_name, content) = params
715     return backend.JobQueueUpdate(file_name, content)
716
717   @staticmethod
718   @_RequireJobQueueLock
719   def perspective_jobqueue_purge(params):
720     """Purge job queue.
721
722     """
723     return backend.JobQueuePurge()
724
725   @staticmethod
726   @_RequireJobQueueLock
727   def perspective_jobqueue_rename(params):
728     """Rename a job queue file.
729
730     """
731     # TODO: What if a file fails to rename?
732     return [backend.JobQueueRename(old, new) for old, new in params]
733
734   @staticmethod
735   def perspective_jobqueue_set_drain(params):
736     """Set/unset the queue drain flag.
737
738     """
739     drain_flag = params[0]
740     return backend.JobQueueSetDrainFlag(drain_flag)
741
742
743   # hypervisor ---------------
744
745   @staticmethod
746   def perspective_hypervisor_validate_params(params):
747     """Validate the hypervisor parameters.
748
749     """
750     (hvname, hvparams) = params
751     return backend.ValidateHVParams(hvname, hvparams)
752
753
754 def ExecNODED(options, args):
755   """Main NODED function, executed with the pidfile held.
756
757   """
758   global queue_lock
759
760   # Read SSL certificate
761   if options.ssl:
762     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
763                                     ssl_cert_path=options.ssl_cert)
764   else:
765     ssl_params = None
766
767   # Prepare job queue
768   queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
769
770   mainloop = daemon.Mainloop()
771   server = NodeHttpServer(mainloop, options.bind_address, options.port,
772                           ssl_params=ssl_params, ssl_verify_peer=True)
773   server.Start()
774   try:
775     mainloop.Run()
776   finally:
777     server.Stop()
778
779
780 def main():
781   """Main function for the node daemon.
782
783   """
784   parser = OptionParser(description="Ganeti node daemon",
785                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
786                         version="%%prog (ganeti) %s" %
787                         constants.RELEASE_VERSION)
788   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
789   dirs.append((constants.LOG_OS_DIR, 0750))
790   dirs.append((constants.LOCK_DIR, 1777))
791   daemon.GenericMain(constants.NODED, parser, dirs, None, ExecNODED)
792
793
794 if __name__ == '__main__':
795   main()