Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 3b3db8fd

History | View | Annotate | Download (16.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import BaseHTTPServer
31
import simplejson
32
import errno
33

    
34
from optparse import OptionParser
35

    
36

    
37
from ganeti import backend
38
from ganeti import logger
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import errors
42
from ganeti import ssconf
43
from ganeti import utils
44

    
45

    
46
class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler):
47
  """The server implementation.
48

    
49
  This class holds all methods exposed over the RPC interface.
50

    
51
  """
52
  def do_PUT(self):
53
    """Handle a post request.
54

    
55
    """
56
    path = self.path
57
    if path.startswith("/"):
58
      path = path[1:]
59
    mname = "perspective_%s" % path
60
    if not hasattr(self, mname):
61
      self.send_error(404)
62
      return False
63

    
64
    method = getattr(self, mname)
65
    try:
66
      body_length = int(self.headers.get('Content-Length', '0'))
67
    except ValueError:
68
      self.send_error(400, 'No Content-Length header or invalid format')
69
      return False
70

    
71
    try:
72
      body = self.rfile.read(body_length)
73
    except socket.error, err:
74
      logger.Error("Socket error while reading: %s" % str(err))
75
      return
76
    try:
77
      params = simplejson.loads(body)
78
      result = method(params)
79
      payload = simplejson.dumps(result)
80
    except Exception, err:
81
      self.send_error(500, "Error: %s" % str(err))
82
      return False
83
    self.send_response(200)
84
    self.send_header('Content-Length', str(len(payload)))
85
    self.end_headers()
86
    self.wfile.write(payload)
87
    return True
88

    
89
  def log_message(self, format, *args):
90
    """Log a request to the log.
91

    
92
    This is the same as the parent, we just log somewhere else.
93

    
94
    """
95
    msg = ("%s - - [%s] %s" %
96
           (self.address_string(),
97
            self.log_date_time_string(),
98
            format % args))
99
    logger.Debug(msg)
100

    
101
  # the new block devices  --------------------------
102

    
103
  @staticmethod
104
  def perspective_blockdev_create(params):
105
    """Create a block device.
106

    
107
    """
108
    bdev_s, size, owner, on_primary, info = params
109
    bdev = objects.Disk.FromDict(bdev_s)
110
    if bdev is None:
111
      raise ValueError("can't unserialize data!")
112
    return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
113

    
114
  @staticmethod
115
  def perspective_blockdev_remove(params):
116
    """Remove a block device.
117

    
118
    """
119
    bdev_s = params[0]
120
    bdev = objects.Disk.FromDict(bdev_s)
121
    return backend.RemoveBlockDevice(bdev)
122

    
123
  @staticmethod
124
  def perspective_blockdev_rename(params):
125
    """Remove a block device.
126

    
127
    """
128
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
129
    return backend.RenameBlockDevices(devlist)
130

    
131
  @staticmethod
132
  def perspective_blockdev_assemble(params):
133
    """Assemble a block device.
134

    
135
    """
136
    bdev_s, owner, on_primary = params
137
    bdev = objects.Disk.FromDict(bdev_s)
138
    if bdev is None:
139
      raise ValueError("can't unserialize data!")
140
    return backend.AssembleBlockDevice(bdev, owner, on_primary)
141

    
142
  @staticmethod
143
  def perspective_blockdev_shutdown(params):
144
    """Shutdown a block device.
145

    
146
    """
147
    bdev_s = params[0]
148
    bdev = objects.Disk.FromDict(bdev_s)
149
    if bdev is None:
150
      raise ValueError("can't unserialize data!")
151
    return backend.ShutdownBlockDevice(bdev)
152

    
153
  @staticmethod
154
  def perspective_blockdev_addchildren(params):
155
    """Add a child to a mirror device.
156

    
157
    Note: this is only valid for mirror devices. It's the caller's duty
158
    to send a correct disk, otherwise we raise an error.
159

    
160
    """
161
    bdev_s, ndev_s = params
162
    bdev = objects.Disk.FromDict(bdev_s)
163
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
164
    if bdev is None or ndevs.count(None) > 0:
165
      raise ValueError("can't unserialize data!")
166
    return backend.MirrorAddChildren(bdev, ndevs)
167

    
168
  @staticmethod
169
  def perspective_blockdev_removechildren(params):
170
    """Remove a child from a mirror device.
171

    
172
    This is only valid for mirror devices, of course. It's the callers
173
    duty to send a correct disk, otherwise we raise an error.
174

    
175
    """
176
    bdev_s, ndev_s = params
177
    bdev = objects.Disk.FromDict(bdev_s)
178
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
179
    if bdev is None or ndevs.count(None) > 0:
180
      raise ValueError("can't unserialize data!")
181
    return backend.MirrorRemoveChildren(bdev, ndevs)
182

    
183
  @staticmethod
184
  def perspective_blockdev_getmirrorstatus(params):
185
    """Return the mirror status for a list of disks.
186

    
187
    """
188
    disks = [objects.Disk.FromDict(dsk_s)
189
            for dsk_s in params]
190
    return backend.GetMirrorStatus(disks)
191

    
192
  @staticmethod
193
  def perspective_blockdev_find(params):
194
    """Expose the FindBlockDevice functionality for a disk.
195

    
196
    This will try to find but not activate a disk.
197

    
198
    """
199
    disk = objects.Disk.FromDict(params[0])
200
    return backend.FindBlockDevice(disk)
201

    
202
  @staticmethod
203
  def perspective_blockdev_snapshot(params):
204
    """Create a snapshot device.
205

    
206
    Note that this is only valid for LVM disks, if we get passed
207
    something else we raise an exception. The snapshot device can be
208
    remove by calling the generic block device remove call.
209

    
210
    """
211
    cfbd = objects.Disk.FromDict(params[0])
212
    return backend.SnapshotBlockDevice(cfbd)
213

    
214
  @staticmethod
215
  def perspective_blockdev_grow(params):
216
    """Grow a stack of devices.
217

    
218
    """
219
    cfbd = objects.Disk.FromDict(params[0])
220
    amount = params[1]
221
    return backend.GrowBlockDevice(cfbd, amount)
222

    
223
  @staticmethod
224
  def perspective_blockdev_close(params):
225
    """Closes the given block devices.
226

    
227
    """
228
    disks = [objects.Disk.FromDict(cf) for cf in params]
229
    return backend.CloseBlockDevices(disks)
230

    
231
  # export/import  --------------------------
232

    
233
  @staticmethod
234
  def perspective_snapshot_export(params):
235
    """Export a given snapshot.
236

    
237
    """
238
    disk = objects.Disk.FromDict(params[0])
239
    dest_node = params[1]
240
    instance = objects.Instance.FromDict(params[2])
241
    return backend.ExportSnapshot(disk, dest_node, instance)
242

    
243
  @staticmethod
244
  def perspective_finalize_export(params):
245
    """Expose the finalize export functionality.
246

    
247
    """
248
    instance = objects.Instance.FromDict(params[0])
249
    snap_disks = [objects.Disk.FromDict(str_data)
250
                  for str_data in params[1]]
251
    return backend.FinalizeExport(instance, snap_disks)
252

    
253
  @staticmethod
254
  def perspective_export_info(params):
255
    """Query information about an existing export on this node.
256

    
257
    The given path may not contain an export, in which case we return
258
    None.
259

    
260
    """
261
    path = params[0]
262
    einfo = backend.ExportInfo(path)
263
    if einfo is None:
264
      return einfo
265
    return einfo.Dumps()
266

    
267
  @staticmethod
268
  def perspective_export_list(params):
269
    """List the available exports on this node.
270

    
271
    Note that as opposed to export_info, which may query data about an
272
    export in any path, this only queries the standard Ganeti path
273
    (constants.EXPORT_DIR).
274

    
275
    """
276
    return backend.ListExports()
277

    
278
  @staticmethod
279
  def perspective_export_remove(params):
280
    """Remove an export.
281

    
282
    """
283
    export = params[0]
284
    return backend.RemoveExport(export)
285

    
286
  # volume  --------------------------
287

    
288
  @staticmethod
289
  def perspective_volume_list(params):
290
    """Query the list of logical volumes in a given volume group.
291

    
292
    """
293
    vgname = params[0]
294
    return backend.GetVolumeList(vgname)
295

    
296
  @staticmethod
297
  def perspective_vg_list(params):
298
    """Query the list of volume groups.
299

    
300
    """
301
    return backend.ListVolumeGroups()
302

    
303
  # bridge  --------------------------
304

    
305
  @staticmethod
306
  def perspective_bridges_exist(params):
307
    """Check if all bridges given exist on this node.
308

    
309
    """
310
    bridges_list = params[0]
311
    return backend.BridgesExist(bridges_list)
312

    
313
  # instance  --------------------------
314

    
315
  @staticmethod
316
  def perspective_instance_os_add(params):
317
    """Install an OS on a given instance.
318

    
319
    """
320
    inst_s, os_disk, swap_disk = params
321
    inst = objects.Instance.FromDict(inst_s)
322
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
323

    
324
  @staticmethod
325
  def perspective_instance_run_rename(params):
326
    """Runs the OS rename script for an instance.
327

    
328
    """
329
    inst_s, old_name, os_disk, swap_disk = params
330
    inst = objects.Instance.FromDict(inst_s)
331
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
332

    
333
  @staticmethod
334
  def perspective_instance_os_import(params):
335
    """Run the import function of an OS onto a given instance.
336

    
337
    """
338
    inst_s, os_disk, swap_disk, src_node, src_image = params
339
    inst = objects.Instance.FromDict(inst_s)
340
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
341
                                        src_node, src_image)
342

    
343
  @staticmethod
344
  def perspective_instance_shutdown(params):
345
    """Shutdown an instance.
346

    
347
    """
348
    instance = objects.Instance.FromDict(params[0])
349
    return backend.ShutdownInstance(instance)
350

    
351
  @staticmethod
352
  def perspective_instance_start(params):
353
    """Start an instance.
354

    
355
    """
356
    instance = objects.Instance.FromDict(params[0])
357
    extra_args = params[1]
358
    return backend.StartInstance(instance, extra_args)
359

    
360
  @staticmethod
361
  def perspective_instance_migrate(params):
362
    """Migrates an instance.
363

    
364
    """
365
    instance, target, live = params
366
    return backend.MigrateInstance(instance, target, live)
367

    
368
  @staticmethod
369
  def perspective_instance_reboot(params):
370
    """Reboot an instance.
371

    
372
    """
373
    instance = objects.Instance.FromDict(params[0])
374
    reboot_type = params[1]
375
    extra_args = params[2]
376
    return backend.RebootInstance(instance, reboot_type, extra_args)
377

    
378
  @staticmethod
379
  def perspective_instance_info(params):
380
    """Query instance information.
381

    
382
    """
383
    return backend.GetInstanceInfo(params[0])
384

    
385
  @staticmethod
386
  def perspective_all_instances_info(params):
387
    """Query information about all instances.
388

    
389
    """
390
    return backend.GetAllInstancesInfo()
391

    
392
  @staticmethod
393
  def perspective_instance_list(params):
394
    """Query the list of running instances.
395

    
396
    """
397
    return backend.GetInstanceList()
398

    
399
  # node --------------------------
400

    
401
  @staticmethod
402
  def perspective_node_tcp_ping(params):
403
    """Do a TcpPing on the remote node.
404

    
405
    """
406
    return utils.TcpPing(params[1], params[2], timeout=params[3],
407
                         live_port_needed=params[4], source=params[0])
408

    
409
  @staticmethod
410
  def perspective_node_info(params):
411
    """Query node information.
412

    
413
    """
414
    vgname = params[0]
415
    return backend.GetNodeInfo(vgname)
416

    
417
  @staticmethod
418
  def perspective_node_add(params):
419
    """Complete the registration of this node in the cluster.
420

    
421
    """
422
    return backend.AddNode(params[0], params[1], params[2],
423
                           params[3], params[4], params[5])
424

    
425
  @staticmethod
426
  def perspective_node_verify(params):
427
    """Run a verify sequence on this node.
428

    
429
    """
430
    return backend.VerifyNode(params[0])
431

    
432
  @staticmethod
433
  def perspective_node_start_master(params):
434
    """Promote this node to master status.
435

    
436
    """
437
    return backend.StartMaster()
438

    
439
  @staticmethod
440
  def perspective_node_stop_master(params):
441
    """Demote this node from master status.
442

    
443
    """
444
    return backend.StopMaster()
445

    
446
  @staticmethod
447
  def perspective_node_leave_cluster(params):
448
    """Cleanup after leaving a cluster.
449

    
450
    """
451
    return backend.LeaveCluster()
452

    
453
  @staticmethod
454
  def perspective_node_volumes(params):
455
    """Query the list of all logical volume groups.
456

    
457
    """
458
    return backend.NodeVolumes()
459

    
460
  # cluster --------------------------
461

    
462
  @staticmethod
463
  def perspective_version(params):
464
    """Query version information.
465

    
466
    """
467
    return constants.PROTOCOL_VERSION
468

    
469
  @staticmethod
470
  def perspective_upload_file(params):
471
    """Upload a file.
472

    
473
    Note that the backend implementation imposes strict rules on which
474
    files are accepted.
475

    
476
    """
477
    return backend.UploadFile(*params)
478

    
479

    
480
  # os -----------------------
481

    
482
  @staticmethod
483
  def perspective_os_diagnose(params):
484
    """Query detailed information about existing OSes.
485

    
486
    """
487
    return [os.ToDict() for os in backend.DiagnoseOS()]
488

    
489
  @staticmethod
490
  def perspective_os_get(params):
491
    """Query information about a given OS.
492

    
493
    """
494
    name = params[0]
495
    try:
496
      os_obj = backend.OSFromDisk(name)
497
    except errors.InvalidOS, err:
498
      os_obj = objects.OS.FromInvalidOS(err)
499
    return os_obj.ToDict()
500

    
501
  # hooks -----------------------
502

    
503
  @staticmethod
504
  def perspective_hooks_runner(params):
505
    """Run hook scripts.
506

    
507
    """
508
    hpath, phase, env = params
509
    hr = backend.HooksRunner()
510
    return hr.RunHooks(hpath, phase, env)
511

    
512
  # iallocator -----------------
513

    
514
  @staticmethod
515
  def perspective_iallocator_runner(params):
516
    """Run an iallocator script.
517

    
518
    """
519
    name, idata = params
520
    iar = backend.IAllocatorRunner()
521
    return iar.Run(name, idata)
522

    
523
  # test -----------------------
524

    
525
  @staticmethod
526
  def perspective_test_delay(params):
527
    """Run test delay.
528

    
529
    """
530
    duration = params[0]
531
    return utils.TestDelay(duration)
532

    
533
  @staticmethod
534
  def perspective_file_storage_dir_create(params):
535
    """Create the file storage directory.
536

    
537
    """
538
    file_storage_dir = params[0]
539
    return backend.CreateFileStorageDir(file_storage_dir)
540

    
541
  @staticmethod
542
  def perspective_file_storage_dir_remove(params):
543
    """Remove the file storage directory.
544

    
545
    """
546
    file_storage_dir = params[0]
547
    return backend.RemoveFileStorageDir(file_storage_dir)
548

    
549
  @staticmethod
550
  def perspective_file_storage_dir_rename(params):
551
    """Rename the file storage directory.
552

    
553
    """
554
    old_file_storage_dir = params[0]
555
    new_file_storage_dir = params[1]
556
    return backend.RenameFileStorageDir(old_file_storage_dir,
557
                                        new_file_storage_dir)
558

    
559

    
560
def ParseOptions():
561
  """Parse the command line options.
562

    
563
  Returns:
564
    (options, args) as from OptionParser.parse_args()
565

    
566
  """
567
  parser = OptionParser(description="Ganeti node daemon",
568
                        usage="%prog [-f] [-d]",
569
                        version="%%prog (ganeti) %s" %
570
                        constants.RELEASE_VERSION)
571

    
572
  parser.add_option("-f", "--foreground", dest="fork",
573
                    help="Don't detach from the current terminal",
574
                    default=True, action="store_false")
575
  parser.add_option("-d", "--debug", dest="debug",
576
                    help="Enable some debug messages",
577
                    default=False, action="store_true")
578
  options, args = parser.parse_args()
579
  return options, args
580

    
581

    
582
def main():
583
  """Main function for the node daemon.
584

    
585
  """
586
  options, args = ParseOptions()
587
  utils.debug = options.debug
588
  for fname in (constants.SSL_CERT_FILE,):
589
    if not os.path.isfile(fname):
590
      print "config %s not there, will not run." % fname
591
      sys.exit(5)
592

    
593
  try:
594
    ss = ssconf.SimpleStore()
595
    port = ss.GetNodeDaemonPort()
596
    pwdata = ss.GetNodeDaemonPassword()
597
  except errors.ConfigurationError, err:
598
    print "Cluster configuration incomplete: '%s'" % str(err)
599
    sys.exit(5)
600

    
601
  # create /var/run/ganeti if not existing, in order to take care of
602
  # tmpfs /var/run
603
  if not os.path.exists(constants.BDEV_CACHE_DIR):
604
    try:
605
      os.mkdir(constants.BDEV_CACHE_DIR, 0755)
606
    except EnvironmentError, err:
607
      if err.errno != errno.EEXIST:
608
        print ("Node setup wrong, cannot create directory %s: %s" %
609
               (constants.BDEV_CACHE_DIR, err))
610
        sys.exit(5)
611
  if not os.path.isdir(constants.BDEV_CACHE_DIR):
612
    print ("Node setup wrong, %s is not a directory" %
613
           constants.BDEV_CACHE_DIR)
614
    sys.exit(5)
615

    
616
  # become a daemon
617
  if options.fork:
618
    utils.Daemonize(logfile=constants.LOG_NODESERVER)
619

    
620
  logger.SetupLogging(program="ganeti-noded", debug=options.debug)
621

    
622
  global _EXIT_GANETI_NODED
623
  _EXIT_GANETI_NODED = False
624

    
625
  httpd = BaseHTTPServer.HTTPServer(('', port), ServerObject)
626
  while (not _EXIT_GANETI_NODED):
627
    httpd.handle_request()
628

    
629

    
630
if __name__ == '__main__':
631
  main()