Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 4c8ba8b3

History | View | Annotate | Download (16 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import BaseHTTPServer
31
import simplejson
32
import errno
33

    
34
from optparse import OptionParser
35

    
36

    
37
from ganeti import backend
38
from ganeti import logger
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import errors
42
from ganeti import ssconf
43
from ganeti import utils
44

    
45

    
46
class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler):
47
  """The server implementation.
48

    
49
  This class holds all methods exposed over the RPC interface.
50

    
51
  """
52
  def do_PUT(self):
53
    """Handle a post request.
54

    
55
    """
56
    path = self.path
57
    if path.startswith("/"):
58
      path = path[1:]
59
    mname = "perspective_%s" % path
60
    if not hasattr(self, mname):
61
      self.send_error(404)
62
      return False
63

    
64
    method = getattr(self, mname)
65
    try:
66
      body_length = int(self.headers.get('Content-Length', '0'))
67
    except ValueError:
68
      self.send_error(400, 'No Content-Length header or invalid format')
69
      return False
70

    
71
    try:
72
      body = self.rfile.read(body_length)
73
    except socket.error, err:
74
      logger.Error("Socket error while reading: %s" % str(err))
75
      return
76
    try:
77
      params = simplejson.loads(body)
78
      result = method(params)
79
      payload = simplejson.dumps(result)
80
    except Exception, err:
81
      self.send_error(500, "Error: %s" % str(err))
82
      return False
83
    self.send_response(200)
84
    self.send_header('Content-Length', str(len(payload)))
85
    self.end_headers()
86
    self.wfile.write(payload)
87
    return True
88

    
89
  def log_message(self, format, *args):
90
    """Log a request to the log.
91

    
92
    This is the same as the parent, we just log somewhere else.
93

    
94
    """
95
    msg = ("%s - - [%s] %s" %
96
           (self.address_string(),
97
            self.log_date_time_string(),
98
            format % args))
99
    logger.Debug(msg)
100

    
101
  # the new block devices  --------------------------
102

    
103
  @staticmethod
104
  def perspective_blockdev_create(params):
105
    """Create a block device.
106

    
107
    """
108
    bdev_s, size, owner, on_primary, info = params
109
    bdev = objects.Disk.FromDict(bdev_s)
110
    if bdev is None:
111
      raise ValueError("can't unserialize data!")
112
    return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
113

    
114
  @staticmethod
115
  def perspective_blockdev_remove(params):
116
    """Remove a block device.
117

    
118
    """
119
    bdev_s = params[0]
120
    bdev = objects.Disk.FromDict(bdev_s)
121
    return backend.RemoveBlockDevice(bdev)
122

    
123
  @staticmethod
124
  def perspective_blockdev_rename(params):
125
    """Remove a block device.
126

    
127
    """
128
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
129
    return backend.RenameBlockDevices(devlist)
130

    
131
  @staticmethod
132
  def perspective_blockdev_assemble(params):
133
    """Assemble a block device.
134

    
135
    """
136
    bdev_s, owner, on_primary = params
137
    bdev = objects.Disk.FromDict(bdev_s)
138
    if bdev is None:
139
      raise ValueError("can't unserialize data!")
140
    return backend.AssembleBlockDevice(bdev, owner, on_primary)
141

    
142
  @staticmethod
143
  def perspective_blockdev_shutdown(params):
144
    """Shutdown a block device.
145

    
146
    """
147
    bdev_s = params[0]
148
    bdev = objects.Disk.FromDict(bdev_s)
149
    if bdev is None:
150
      raise ValueError("can't unserialize data!")
151
    return backend.ShutdownBlockDevice(bdev)
152

    
153
  @staticmethod
154
  def perspective_blockdev_addchildren(params):
155
    """Add a child to a mirror device.
156

    
157
    Note: this is only valid for mirror devices. It's the caller's duty
158
    to send a correct disk, otherwise we raise an error.
159

    
160
    """
161
    bdev_s, ndev_s = params
162
    bdev = objects.Disk.FromDict(bdev_s)
163
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
164
    if bdev is None or ndevs.count(None) > 0:
165
      raise ValueError("can't unserialize data!")
166
    return backend.MirrorAddChildren(bdev, ndevs)
167

    
168
  @staticmethod
169
  def perspective_blockdev_removechildren(params):
170
    """Remove a child from a mirror device.
171

    
172
    This is only valid for mirror devices, of course. It's the callers
173
    duty to send a correct disk, otherwise we raise an error.
174

    
175
    """
176
    bdev_s, ndev_s = params
177
    bdev = objects.Disk.FromDict(bdev_s)
178
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
179
    if bdev is None or ndevs.count(None) > 0:
180
      raise ValueError("can't unserialize data!")
181
    return backend.MirrorRemoveChildren(bdev, ndevs)
182

    
183
  @staticmethod
184
  def perspective_blockdev_getmirrorstatus(params):
185
    """Return the mirror status for a list of disks.
186

    
187
    """
188
    disks = [objects.Disk.FromDict(dsk_s)
189
            for dsk_s in params]
190
    return backend.GetMirrorStatus(disks)
191

    
192
  @staticmethod
193
  def perspective_blockdev_find(params):
194
    """Expose the FindBlockDevice functionality for a disk.
195

    
196
    This will try to find but not activate a disk.
197

    
198
    """
199
    disk = objects.Disk.FromDict(params[0])
200
    return backend.FindBlockDevice(disk)
201

    
202
  @staticmethod
203
  def perspective_blockdev_snapshot(params):
204
    """Create a snapshot device.
205

    
206
    Note that this is only valid for LVM disks, if we get passed
207
    something else we raise an exception. The snapshot device can be
208
    remove by calling the generic block device remove call.
209

    
210
    """
211
    cfbd = objects.Disk.FromDict(params[0])
212
    return backend.SnapshotBlockDevice(cfbd)
213

    
214
  @staticmethod
215
  def perspective_blockdev_grow(params):
216
    """Grow a stack of devices.
217

    
218
    """
219
    cfbd = objects.Disk.FromDict(params[0])
220
    amount = params[1]
221
    return backend.GrowBlockDevice(cfbd, amount)
222

    
223
  # export/import  --------------------------
224

    
225
  @staticmethod
226
  def perspective_snapshot_export(params):
227
    """Export a given snapshot.
228

    
229
    """
230
    disk = objects.Disk.FromDict(params[0])
231
    dest_node = params[1]
232
    instance = objects.Instance.FromDict(params[2])
233
    return backend.ExportSnapshot(disk, dest_node, instance)
234

    
235
  @staticmethod
236
  def perspective_finalize_export(params):
237
    """Expose the finalize export functionality.
238

    
239
    """
240
    instance = objects.Instance.FromDict(params[0])
241
    snap_disks = [objects.Disk.FromDict(str_data)
242
                  for str_data in params[1]]
243
    return backend.FinalizeExport(instance, snap_disks)
244

    
245
  @staticmethod
246
  def perspective_export_info(params):
247
    """Query information about an existing export on this node.
248

    
249
    The given path may not contain an export, in which case we return
250
    None.
251

    
252
    """
253
    path = params[0]
254
    einfo = backend.ExportInfo(path)
255
    if einfo is None:
256
      return einfo
257
    return einfo.Dumps()
258

    
259
  @staticmethod
260
  def perspective_export_list(params):
261
    """List the available exports on this node.
262

    
263
    Note that as opposed to export_info, which may query data about an
264
    export in any path, this only queries the standard Ganeti path
265
    (constants.EXPORT_DIR).
266

    
267
    """
268
    return backend.ListExports()
269

    
270
  @staticmethod
271
  def perspective_export_remove(params):
272
    """Remove an export.
273

    
274
    """
275
    export = params[0]
276
    return backend.RemoveExport(export)
277

    
278
  # volume  --------------------------
279

    
280
  @staticmethod
281
  def perspective_volume_list(params):
282
    """Query the list of logical volumes in a given volume group.
283

    
284
    """
285
    vgname = params[0]
286
    return backend.GetVolumeList(vgname)
287

    
288
  @staticmethod
289
  def perspective_vg_list(params):
290
    """Query the list of volume groups.
291

    
292
    """
293
    return backend.ListVolumeGroups()
294

    
295
  # bridge  --------------------------
296

    
297
  @staticmethod
298
  def perspective_bridges_exist(params):
299
    """Check if all bridges given exist on this node.
300

    
301
    """
302
    bridges_list = params[0]
303
    return backend.BridgesExist(bridges_list)
304

    
305
  # instance  --------------------------
306

    
307
  @staticmethod
308
  def perspective_instance_os_add(params):
309
    """Install an OS on a given instance.
310

    
311
    """
312
    inst_s, os_disk, swap_disk = params
313
    inst = objects.Instance.FromDict(inst_s)
314
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
315

    
316
  @staticmethod
317
  def perspective_instance_run_rename(params):
318
    """Runs the OS rename script for an instance.
319

    
320
    """
321
    inst_s, old_name, os_disk, swap_disk = params
322
    inst = objects.Instance.FromDict(inst_s)
323
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
324

    
325
  @staticmethod
326
  def perspective_instance_os_import(params):
327
    """Run the import function of an OS onto a given instance.
328

    
329
    """
330
    inst_s, os_disk, swap_disk, src_node, src_image = params
331
    inst = objects.Instance.FromDict(inst_s)
332
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
333
                                        src_node, src_image)
334

    
335
  @staticmethod
336
  def perspective_instance_shutdown(params):
337
    """Shutdown an instance.
338

    
339
    """
340
    instance = objects.Instance.FromDict(params[0])
341
    return backend.ShutdownInstance(instance)
342

    
343
  @staticmethod
344
  def perspective_instance_start(params):
345
    """Start an instance.
346

    
347
    """
348
    instance = objects.Instance.FromDict(params[0])
349
    extra_args = params[1]
350
    return backend.StartInstance(instance, extra_args)
351

    
352
  @staticmethod
353
  def perspective_instance_migrate(params):
354
    """Migrates an instance.
355

    
356
    """
357
    instance, target, live = params
358
    return backend.MigrateInstance(instance, target, live)
359

    
360
  @staticmethod
361
  def perspective_instance_reboot(params):
362
    """Reboot an instance.
363

    
364
    """
365
    instance = objects.Instance.FromDict(params[0])
366
    reboot_type = params[1]
367
    extra_args = params[2]
368
    return backend.RebootInstance(instance, reboot_type, extra_args)
369

    
370
  @staticmethod
371
  def perspective_instance_info(params):
372
    """Query instance information.
373

    
374
    """
375
    return backend.GetInstanceInfo(params[0])
376

    
377
  @staticmethod
378
  def perspective_all_instances_info(params):
379
    """Query information about all instances.
380

    
381
    """
382
    return backend.GetAllInstancesInfo()
383

    
384
  @staticmethod
385
  def perspective_instance_list(params):
386
    """Query the list of running instances.
387

    
388
    """
389
    return backend.GetInstanceList()
390

    
391
  # node --------------------------
392

    
393
  @staticmethod
394
  def perspective_node_tcp_ping(params):
395
    """Do a TcpPing on the remote node.
396

    
397
    """
398
    return utils.TcpPing(params[1], params[2], timeout=params[3],
399
                         live_port_needed=params[4], source=params[0])
400

    
401
  @staticmethod
402
  def perspective_node_info(params):
403
    """Query node information.
404

    
405
    """
406
    vgname = params[0]
407
    return backend.GetNodeInfo(vgname)
408

    
409
  @staticmethod
410
  def perspective_node_add(params):
411
    """Complete the registration of this node in the cluster.
412

    
413
    """
414
    return backend.AddNode(params[0], params[1], params[2],
415
                           params[3], params[4], params[5])
416

    
417
  @staticmethod
418
  def perspective_node_verify(params):
419
    """Run a verify sequence on this node.
420

    
421
    """
422
    return backend.VerifyNode(params[0])
423

    
424
  @staticmethod
425
  def perspective_node_start_master(params):
426
    """Promote this node to master status.
427

    
428
    """
429
    return backend.StartMaster()
430

    
431
  @staticmethod
432
  def perspective_node_stop_master(params):
433
    """Demote this node from master status.
434

    
435
    """
436
    return backend.StopMaster()
437

    
438
  @staticmethod
439
  def perspective_node_leave_cluster(params):
440
    """Cleanup after leaving a cluster.
441

    
442
    """
443
    return backend.LeaveCluster()
444

    
445
  @staticmethod
446
  def perspective_node_volumes(params):
447
    """Query the list of all logical volume groups.
448

    
449
    """
450
    return backend.NodeVolumes()
451

    
452
  # cluster --------------------------
453

    
454
  @staticmethod
455
  def perspective_version(params):
456
    """Query version information.
457

    
458
    """
459
    return constants.PROTOCOL_VERSION
460

    
461
  @staticmethod
462
  def perspective_upload_file(params):
463
    """Upload a file.
464

    
465
    Note that the backend implementation imposes strict rules on which
466
    files are accepted.
467

    
468
    """
469
    return backend.UploadFile(*params)
470

    
471

    
472
  # os -----------------------
473

    
474
  @staticmethod
475
  def perspective_os_diagnose(params):
476
    """Query detailed information about existing OSes.
477

    
478
    """
479
    return [os.ToDict() for os in backend.DiagnoseOS()]
480

    
481
  @staticmethod
482
  def perspective_os_get(params):
483
    """Query information about a given OS.
484

    
485
    """
486
    name = params[0]
487
    try:
488
      os_obj = backend.OSFromDisk(name)
489
    except errors.InvalidOS, err:
490
      os_obj = objects.OS.FromInvalidOS(err)
491
    return os_obj.ToDict()
492

    
493
  # hooks -----------------------
494

    
495
  @staticmethod
496
  def perspective_hooks_runner(params):
497
    """Run hook scripts.
498

    
499
    """
500
    hpath, phase, env = params
501
    hr = backend.HooksRunner()
502
    return hr.RunHooks(hpath, phase, env)
503

    
504
  # iallocator -----------------
505

    
506
  @staticmethod
507
  def perspective_iallocator_runner(params):
508
    """Run an iallocator script.
509

    
510
    """
511
    name, idata = params
512
    iar = backend.IAllocatorRunner()
513
    return iar.Run(name, idata)
514

    
515
  # test -----------------------
516

    
517
  @staticmethod
518
  def perspective_test_delay(params):
519
    """Run test delay.
520

    
521
    """
522
    duration = params[0]
523
    return utils.TestDelay(duration)
524

    
525
  @staticmethod
526
  def perspective_file_storage_dir_create(params):
527
    """Create the file storage directory.
528

    
529
    """
530
    file_storage_dir = params[0]
531
    return backend.CreateFileStorageDir(file_storage_dir)
532

    
533
  @staticmethod
534
  def perspective_file_storage_dir_remove(params):
535
    """Remove the file storage directory.
536

    
537
    """
538
    file_storage_dir = params[0]
539
    return backend.RemoveFileStorageDir(file_storage_dir)
540

    
541
  @staticmethod
542
  def perspective_file_storage_dir_rename(params):
543
    """Rename the file storage directory.
544

    
545
    """
546
    old_file_storage_dir = params[0]
547
    new_file_storage_dir = params[1]
548
    return backend.RenameFileStorageDir(old_file_storage_dir,
549
                                        new_file_storage_dir)
550

    
551

    
552
def ParseOptions():
553
  """Parse the command line options.
554

    
555
  Returns:
556
    (options, args) as from OptionParser.parse_args()
557

    
558
  """
559
  parser = OptionParser(description="Ganeti node daemon",
560
                        usage="%prog [-f] [-d]",
561
                        version="%%prog (ganeti) %s" %
562
                        constants.RELEASE_VERSION)
563

    
564
  parser.add_option("-f", "--foreground", dest="fork",
565
                    help="Don't detach from the current terminal",
566
                    default=True, action="store_false")
567
  parser.add_option("-d", "--debug", dest="debug",
568
                    help="Enable some debug messages",
569
                    default=False, action="store_true")
570
  options, args = parser.parse_args()
571
  return options, args
572

    
573

    
574
def main():
575
  """Main function for the node daemon.
576

    
577
  """
578
  options, args = ParseOptions()
579
  utils.debug = options.debug
580
  for fname in (constants.SSL_CERT_FILE,):
581
    if not os.path.isfile(fname):
582
      print "config %s not there, will not run." % fname
583
      sys.exit(5)
584

    
585
  try:
586
    ss = ssconf.SimpleStore()
587
    port = ss.GetNodeDaemonPort()
588
    pwdata = ss.GetNodeDaemonPassword()
589
  except errors.ConfigurationError, err:
590
    print "Cluster configuration incomplete: '%s'" % str(err)
591
    sys.exit(5)
592

    
593
  # create /var/run/ganeti if not existing, in order to take care of
594
  # tmpfs /var/run
595
  if not os.path.exists(constants.BDEV_CACHE_DIR):
596
    try:
597
      os.mkdir(constants.BDEV_CACHE_DIR, 0755)
598
    except EnvironmentError, err:
599
      if err.errno != errno.EEXIST:
600
        print ("Node setup wrong, cannot create directory %s: %s" %
601
               (constants.BDEV_CACHE_DIR, err))
602
        sys.exit(5)
603
  if not os.path.isdir(constants.BDEV_CACHE_DIR):
604
    print ("Node setup wrong, %s is not a directory" %
605
           constants.BDEV_CACHE_DIR)
606
    sys.exit(5)
607

    
608
  # become a daemon
609
  if options.fork:
610
    utils.Daemonize(logfile=constants.LOG_NODESERVER)
611

    
612
  logger.SetupLogging(program="ganeti-noded", debug=options.debug)
613

    
614
  httpd = BaseHTTPServer.HTTPServer(('', port), ServerObject)
615
  httpd.serve_forever()
616

    
617

    
618
if __name__ == '__main__':
619
  main()