Forward-port: Small codestyle fixes for dumb-allocator
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import BaseHTTPServer
31 import simplejson
32 import errno
33
34 from optparse import OptionParser
35
36
37 from ganeti import backend
38 from ganeti import logger
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import errors
42 from ganeti import ssconf
43 from ganeti import utils
44
45
46 class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler):
47   """The server implementation.
48
49   This class holds all methods exposed over the RPC interface.
50
51   """
52   def do_PUT(self):
53     """Handle a post request.
54
55     """
56     path = self.path
57     if path.startswith("/"):
58       path = path[1:]
59     mname = "perspective_%s" % path
60     if not hasattr(self, mname):
61       self.send_error(404)
62       return False
63
64     method = getattr(self, mname)
65     try:
66       body_length = int(self.headers.get('Content-Length', '0'))
67     except ValueError:
68       self.send_error(400, 'No Content-Length header or invalid format')
69       return False
70
71     try:
72       body = self.rfile.read(body_length)
73     except socket.error, err:
74       logger.Error("Socket error while reading: %s" % str(err))
75       return
76     try:
77       params = simplejson.loads(body)
78       result = method(params)
79       payload = simplejson.dumps(result)
80     except Exception, err:
81       self.send_error(500, "Error: %s" % str(err))
82       return False
83     self.send_response(200)
84     self.send_header('Content-Length', str(len(payload)))
85     self.end_headers()
86     self.wfile.write(payload)
87     return True
88
89   def log_message(self, format, *args):
90     """Log a request to the log.
91
92     This is the same as the parent, we just log somewhere else.
93
94     """
95     msg = ("%s - - [%s] %s" %
96            (self.address_string(),
97             self.log_date_time_string(),
98             format % args))
99     logger.Debug(msg)
100
101   # the new block devices  --------------------------
102
103   @staticmethod
104   def perspective_blockdev_create(params):
105     """Create a block device.
106
107     """
108     bdev_s, size, owner, on_primary, info = params
109     bdev = objects.Disk.FromDict(bdev_s)
110     if bdev is None:
111       raise ValueError("can't unserialize data!")
112     return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
113
114   @staticmethod
115   def perspective_blockdev_remove(params):
116     """Remove a block device.
117
118     """
119     bdev_s = params[0]
120     bdev = objects.Disk.FromDict(bdev_s)
121     return backend.RemoveBlockDevice(bdev)
122
123   @staticmethod
124   def perspective_blockdev_rename(params):
125     """Remove a block device.
126
127     """
128     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
129     return backend.RenameBlockDevices(devlist)
130
131   @staticmethod
132   def perspective_blockdev_assemble(params):
133     """Assemble a block device.
134
135     """
136     bdev_s, owner, on_primary = params
137     bdev = objects.Disk.FromDict(bdev_s)
138     if bdev is None:
139       raise ValueError("can't unserialize data!")
140     return backend.AssembleBlockDevice(bdev, owner, on_primary)
141
142   @staticmethod
143   def perspective_blockdev_shutdown(params):
144     """Shutdown a block device.
145
146     """
147     bdev_s = params[0]
148     bdev = objects.Disk.FromDict(bdev_s)
149     if bdev is None:
150       raise ValueError("can't unserialize data!")
151     return backend.ShutdownBlockDevice(bdev)
152
153   @staticmethod
154   def perspective_blockdev_addchildren(params):
155     """Add a child to a mirror device.
156
157     Note: this is only valid for mirror devices. It's the caller's duty
158     to send a correct disk, otherwise we raise an error.
159
160     """
161     bdev_s, ndev_s = params
162     bdev = objects.Disk.FromDict(bdev_s)
163     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
164     if bdev is None or ndevs.count(None) > 0:
165       raise ValueError("can't unserialize data!")
166     return backend.MirrorAddChildren(bdev, ndevs)
167
168   @staticmethod
169   def perspective_blockdev_removechildren(params):
170     """Remove a child from a mirror device.
171
172     This is only valid for mirror devices, of course. It's the callers
173     duty to send a correct disk, otherwise we raise an error.
174
175     """
176     bdev_s, ndev_s = params
177     bdev = objects.Disk.FromDict(bdev_s)
178     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
179     if bdev is None or ndevs.count(None) > 0:
180       raise ValueError("can't unserialize data!")
181     return backend.MirrorRemoveChildren(bdev, ndevs)
182
183   @staticmethod
184   def perspective_blockdev_getmirrorstatus(params):
185     """Return the mirror status for a list of disks.
186
187     """
188     disks = [objects.Disk.FromDict(dsk_s)
189             for dsk_s in params]
190     return backend.GetMirrorStatus(disks)
191
192   @staticmethod
193   def perspective_blockdev_find(params):
194     """Expose the FindBlockDevice functionality for a disk.
195
196     This will try to find but not activate a disk.
197
198     """
199     disk = objects.Disk.FromDict(params[0])
200     return backend.FindBlockDevice(disk)
201
202   @staticmethod
203   def perspective_blockdev_snapshot(params):
204     """Create a snapshot device.
205
206     Note that this is only valid for LVM disks, if we get passed
207     something else we raise an exception. The snapshot device can be
208     remove by calling the generic block device remove call.
209
210     """
211     cfbd = objects.Disk.FromDict(params[0])
212     return backend.SnapshotBlockDevice(cfbd)
213
214   # export/import  --------------------------
215
216   @staticmethod
217   def perspective_snapshot_export(params):
218     """Export a given snapshot.
219
220     """
221     disk = objects.Disk.FromDict(params[0])
222     dest_node = params[1]
223     instance = objects.Instance.FromDict(params[2])
224     return backend.ExportSnapshot(disk, dest_node, instance)
225
226   @staticmethod
227   def perspective_finalize_export(params):
228     """Expose the finalize export functionality.
229
230     """
231     instance = objects.Instance.FromDict(params[0])
232     snap_disks = [objects.Disk.FromDict(str_data)
233                   for str_data in params[1]]
234     return backend.FinalizeExport(instance, snap_disks)
235
236   @staticmethod
237   def perspective_export_info(params):
238     """Query information about an existing export on this node.
239
240     The given path may not contain an export, in which case we return
241     None.
242
243     """
244     path = params[0]
245     einfo = backend.ExportInfo(path)
246     if einfo is None:
247       return einfo
248     return einfo.Dumps()
249
250   @staticmethod
251   def perspective_export_list(params):
252     """List the available exports on this node.
253
254     Note that as opposed to export_info, which may query data about an
255     export in any path, this only queries the standard Ganeti path
256     (constants.EXPORT_DIR).
257
258     """
259     return backend.ListExports()
260
261   @staticmethod
262   def perspective_export_remove(params):
263     """Remove an export.
264
265     """
266     export = params[0]
267     return backend.RemoveExport(export)
268
269   # volume  --------------------------
270
271   @staticmethod
272   def perspective_volume_list(params):
273     """Query the list of logical volumes in a given volume group.
274
275     """
276     vgname = params[0]
277     return backend.GetVolumeList(vgname)
278
279   @staticmethod
280   def perspective_vg_list(params):
281     """Query the list of volume groups.
282
283     """
284     return backend.ListVolumeGroups()
285
286   # bridge  --------------------------
287
288   @staticmethod
289   def perspective_bridges_exist(params):
290     """Check if all bridges given exist on this node.
291
292     """
293     bridges_list = params[0]
294     return backend.BridgesExist(bridges_list)
295
296   # instance  --------------------------
297
298   @staticmethod
299   def perspective_instance_os_add(params):
300     """Install an OS on a given instance.
301
302     """
303     inst_s, os_disk, swap_disk = params
304     inst = objects.Instance.FromDict(inst_s)
305     return backend.AddOSToInstance(inst, os_disk, swap_disk)
306
307   @staticmethod
308   def perspective_instance_run_rename(params):
309     """Runs the OS rename script for an instance.
310
311     """
312     inst_s, old_name, os_disk, swap_disk = params
313     inst = objects.Instance.FromDict(inst_s)
314     return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
315
316   @staticmethod
317   def perspective_instance_os_import(params):
318     """Run the import function of an OS onto a given instance.
319
320     """
321     inst_s, os_disk, swap_disk, src_node, src_image = params
322     inst = objects.Instance.FromDict(inst_s)
323     return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
324                                         src_node, src_image)
325
326   @staticmethod
327   def perspective_instance_shutdown(params):
328     """Shutdown an instance.
329
330     """
331     instance = objects.Instance.FromDict(params[0])
332     return backend.ShutdownInstance(instance)
333
334   @staticmethod
335   def perspective_instance_start(params):
336     """Start an instance.
337
338     """
339     instance = objects.Instance.FromDict(params[0])
340     extra_args = params[1]
341     return backend.StartInstance(instance, extra_args)
342
343   @staticmethod
344   def perspective_instance_reboot(params):
345     """Reboot an instance.
346
347     """
348     instance = objects.Instance.FromDict(params[0])
349     reboot_type = params[1]
350     extra_args = params[2]
351     return backend.RebootInstance(instance, reboot_type, extra_args)
352
353   @staticmethod
354   def perspective_instance_info(params):
355     """Query instance information.
356
357     """
358     return backend.GetInstanceInfo(params[0])
359
360   @staticmethod
361   def perspective_all_instances_info(params):
362     """Query information about all instances.
363
364     """
365     return backend.GetAllInstancesInfo()
366
367   @staticmethod
368   def perspective_instance_list(params):
369     """Query the list of running instances.
370
371     """
372     return backend.GetInstanceList()
373
374   # node --------------------------
375
376   @staticmethod
377   def perspective_node_tcp_ping(params):
378     """Do a TcpPing on the remote node.
379
380     """
381     return utils.TcpPing(params[1], params[2], timeout=params[3],
382                          live_port_needed=params[4], source=params[0])
383
384   @staticmethod
385   def perspective_node_info(params):
386     """Query node information.
387
388     """
389     vgname = params[0]
390     return backend.GetNodeInfo(vgname)
391
392   @staticmethod
393   def perspective_node_add(params):
394     """Complete the registration of this node in the cluster.
395
396     """
397     return backend.AddNode(params[0], params[1], params[2],
398                            params[3], params[4], params[5])
399
400   @staticmethod
401   def perspective_node_verify(params):
402     """Run a verify sequence on this node.
403
404     """
405     return backend.VerifyNode(params[0])
406
407   @staticmethod
408   def perspective_node_start_master(params):
409     """Promote this node to master status.
410
411     """
412     return backend.StartMaster()
413
414   @staticmethod
415   def perspective_node_stop_master(params):
416     """Demote this node from master status.
417
418     """
419     return backend.StopMaster()
420
421   @staticmethod
422   def perspective_node_leave_cluster(params):
423     """Cleanup after leaving a cluster.
424
425     """
426     return backend.LeaveCluster()
427
428   @staticmethod
429   def perspective_node_volumes(params):
430     """Query the list of all logical volume groups.
431
432     """
433     return backend.NodeVolumes()
434
435   # cluster --------------------------
436
437   @staticmethod
438   def perspective_version(params):
439     """Query version information.
440
441     """
442     return constants.PROTOCOL_VERSION
443
444   @staticmethod
445   def perspective_upload_file(params):
446     """Upload a file.
447
448     Note that the backend implementation imposes strict rules on which
449     files are accepted.
450
451     """
452     return backend.UploadFile(*params)
453
454
455   # os -----------------------
456
457   @staticmethod
458   def perspective_os_diagnose(params):
459     """Query detailed information about existing OSes.
460
461     """
462     return [os.ToDict() for os in backend.DiagnoseOS()]
463
464   @staticmethod
465   def perspective_os_get(params):
466     """Query information about a given OS.
467
468     """
469     name = params[0]
470     try:
471       os_obj = backend.OSFromDisk(name)
472     except errors.InvalidOS, err:
473       os_obj = objects.OS.FromInvalidOS(err)
474     return os_obj.ToDict()
475
476   # hooks -----------------------
477
478   @staticmethod
479   def perspective_hooks_runner(params):
480     """Run hook scripts.
481
482     """
483     hpath, phase, env = params
484     hr = backend.HooksRunner()
485     return hr.RunHooks(hpath, phase, env)
486
487   # iallocator -----------------
488
489   @staticmethod
490   def perspective_iallocator_runner(params):
491     """Run an iallocator script.
492
493     """
494     name, idata = params
495     iar = backend.IAllocatorRunner()
496     return iar.Run(name, idata)
497
498   # test -----------------------
499
500   @staticmethod
501   def perspective_test_delay(params):
502     """Run test delay.
503
504     """
505     duration = params[0]
506     return utils.TestDelay(duration)
507
508   @staticmethod
509   def perspective_file_storage_dir_create(params):
510     """Create the file storage directory.
511
512     """
513     file_storage_dir = params[0]
514     return backend.CreateFileStorageDir(file_storage_dir)
515
516   @staticmethod
517   def perspective_file_storage_dir_remove(params):
518     """Remove the file storage directory.
519
520     """
521     file_storage_dir = params[0]
522     return backend.RemoveFileStorageDir(file_storage_dir)
523
524   @staticmethod
525   def perspective_file_storage_dir_rename(params):
526     """Rename the file storage directory.
527
528     """
529     old_file_storage_dir = params[0]
530     new_file_storage_dir = params[1]
531     return backend.RenameFileStorageDir(old_file_storage_dir,
532                                         new_file_storage_dir)
533
534
535 def ParseOptions():
536   """Parse the command line options.
537
538   Returns:
539     (options, args) as from OptionParser.parse_args()
540
541   """
542   parser = OptionParser(description="Ganeti node daemon",
543                         usage="%prog [-f] [-d]",
544                         version="%%prog (ganeti) %s" %
545                         constants.RELEASE_VERSION)
546
547   parser.add_option("-f", "--foreground", dest="fork",
548                     help="Don't detach from the current terminal",
549                     default=True, action="store_false")
550   parser.add_option("-d", "--debug", dest="debug",
551                     help="Enable some debug messages",
552                     default=False, action="store_true")
553   options, args = parser.parse_args()
554   return options, args
555
556
557 def main():
558   """Main function for the node daemon.
559
560   """
561   options, args = ParseOptions()
562   utils.debug = options.debug
563   for fname in (constants.SSL_CERT_FILE,):
564     if not os.path.isfile(fname):
565       print "config %s not there, will not run." % fname
566       sys.exit(5)
567
568   try:
569     ss = ssconf.SimpleStore()
570     port = ss.GetNodeDaemonPort()
571     pwdata = ss.GetNodeDaemonPassword()
572   except errors.ConfigurationError, err:
573     print "Cluster configuration incomplete: '%s'" % str(err)
574     sys.exit(5)
575
576   # create /var/run/ganeti if not existing, in order to take care of
577   # tmpfs /var/run
578   if not os.path.exists(constants.BDEV_CACHE_DIR):
579     try:
580       os.mkdir(constants.BDEV_CACHE_DIR, 0755)
581     except EnvironmentError, err:
582       if err.errno != errno.EEXIST:
583         print ("Node setup wrong, cannot create directory %s: %s" %
584                (constants.BDEV_CACHE_DIR, err))
585         sys.exit(5)
586   if not os.path.isdir(constants.BDEV_CACHE_DIR):
587     print ("Node setup wrong, %s is not a directory" %
588            constants.BDEV_CACHE_DIR)
589     sys.exit(5)
590
591   # become a daemon
592   if options.fork:
593     utils.Daemonize(logfile=constants.LOG_NODESERVER)
594
595   logger.SetupLogging(twisted_workaround=True, debug=options.debug,
596                       program="ganeti-noded")
597
598   httpd = BaseHTTPServer.HTTPServer(('', port), ServerObject)
599   httpd.serve_forever()
600
601
602 if __name__ == '__main__':
603   main()