Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 1df6506c

History | View | Annotate | Download (15.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import errno
31
import logging
32

    
33
from optparse import OptionParser
34

    
35

    
36
from ganeti import backend
37
from ganeti import logger
38
from ganeti import constants
39
from ganeti import objects
40
from ganeti import errors
41
from ganeti import ssconf
42
from ganeti import http
43
from ganeti import utils
44

    
45
_EXIT_GANETI_NODED = False
46

    
47

    
48
class NodeDaemonRequestHandler(http.HTTPRequestHandler):
49
  """The server implementation.
50

    
51
  This class holds all methods exposed over the RPC interface.
52

    
53
  """
54
  def HandleRequest(self):
55
    """Handle a request.
56

    
57
    """
58
    global _EXIT_GANETI_NODED
59

    
60
    if self.command.upper() != "PUT":
61
      raise http.HTTPBadRequest()
62

    
63
    path = self.path
64
    if path.startswith("/"):
65
      path = path[1:]
66

    
67
    method = getattr(self, "perspective_%s" % path, None)
68
    if method is None:
69
      raise httperror.HTTPNotFound()
70

    
71
    try:
72
      return method(self.post_data)
73
    except errors.QuitGanetiException, err:
74
      _EXIT_GANETI_NODED = True
75

    
76
  # the new block devices  --------------------------
77

    
78
  @staticmethod
79
  def perspective_blockdev_create(params):
80
    """Create a block device.
81

    
82
    """
83
    bdev_s, size, owner, on_primary, info = params
84
    bdev = objects.Disk.FromDict(bdev_s)
85
    if bdev is None:
86
      raise ValueError("can't unserialize data!")
87
    return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
88

    
89
  @staticmethod
90
  def perspective_blockdev_remove(params):
91
    """Remove a block device.
92

    
93
    """
94
    bdev_s = params[0]
95
    bdev = objects.Disk.FromDict(bdev_s)
96
    return backend.RemoveBlockDevice(bdev)
97

    
98
  @staticmethod
99
  def perspective_blockdev_rename(params):
100
    """Remove a block device.
101

    
102
    """
103
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
104
    return backend.RenameBlockDevices(devlist)
105

    
106
  @staticmethod
107
  def perspective_blockdev_assemble(params):
108
    """Assemble a block device.
109

    
110
    """
111
    bdev_s, owner, on_primary = params
112
    bdev = objects.Disk.FromDict(bdev_s)
113
    if bdev is None:
114
      raise ValueError("can't unserialize data!")
115
    return backend.AssembleBlockDevice(bdev, owner, on_primary)
116

    
117
  @staticmethod
118
  def perspective_blockdev_shutdown(params):
119
    """Shutdown a block device.
120

    
121
    """
122
    bdev_s = params[0]
123
    bdev = objects.Disk.FromDict(bdev_s)
124
    if bdev is None:
125
      raise ValueError("can't unserialize data!")
126
    return backend.ShutdownBlockDevice(bdev)
127

    
128
  @staticmethod
129
  def perspective_blockdev_addchildren(params):
130
    """Add a child to a mirror device.
131

    
132
    Note: this is only valid for mirror devices. It's the caller's duty
133
    to send a correct disk, otherwise we raise an error.
134

    
135
    """
136
    bdev_s, ndev_s = params
137
    bdev = objects.Disk.FromDict(bdev_s)
138
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
139
    if bdev is None or ndevs.count(None) > 0:
140
      raise ValueError("can't unserialize data!")
141
    return backend.MirrorAddChildren(bdev, ndevs)
142

    
143
  @staticmethod
144
  def perspective_blockdev_removechildren(params):
145
    """Remove a child from a mirror device.
146

    
147
    This is only valid for mirror devices, of course. It's the callers
148
    duty to send a correct disk, otherwise we raise an error.
149

    
150
    """
151
    bdev_s, ndev_s = params
152
    bdev = objects.Disk.FromDict(bdev_s)
153
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
154
    if bdev is None or ndevs.count(None) > 0:
155
      raise ValueError("can't unserialize data!")
156
    return backend.MirrorRemoveChildren(bdev, ndevs)
157

    
158
  @staticmethod
159
  def perspective_blockdev_getmirrorstatus(params):
160
    """Return the mirror status for a list of disks.
161

    
162
    """
163
    disks = [objects.Disk.FromDict(dsk_s)
164
            for dsk_s in params]
165
    return backend.GetMirrorStatus(disks)
166

    
167
  @staticmethod
168
  def perspective_blockdev_find(params):
169
    """Expose the FindBlockDevice functionality for a disk.
170

    
171
    This will try to find but not activate a disk.
172

    
173
    """
174
    disk = objects.Disk.FromDict(params[0])
175
    return backend.FindBlockDevice(disk)
176

    
177
  @staticmethod
178
  def perspective_blockdev_snapshot(params):
179
    """Create a snapshot device.
180

    
181
    Note that this is only valid for LVM disks, if we get passed
182
    something else we raise an exception. The snapshot device can be
183
    remove by calling the generic block device remove call.
184

    
185
    """
186
    cfbd = objects.Disk.FromDict(params[0])
187
    return backend.SnapshotBlockDevice(cfbd)
188

    
189
  @staticmethod
190
  def perspective_blockdev_grow(params):
191
    """Grow a stack of devices.
192

    
193
    """
194
    cfbd = objects.Disk.FromDict(params[0])
195
    amount = params[1]
196
    return backend.GrowBlockDevice(cfbd, amount)
197

    
198
  @staticmethod
199
  def perspective_blockdev_close(params):
200
    """Closes the given block devices.
201

    
202
    """
203
    disks = [objects.Disk.FromDict(cf) for cf in params]
204
    return backend.CloseBlockDevices(disks)
205

    
206
  # export/import  --------------------------
207

    
208
  @staticmethod
209
  def perspective_snapshot_export(params):
210
    """Export a given snapshot.
211

    
212
    """
213
    disk = objects.Disk.FromDict(params[0])
214
    dest_node = params[1]
215
    instance = objects.Instance.FromDict(params[2])
216
    return backend.ExportSnapshot(disk, dest_node, instance)
217

    
218
  @staticmethod
219
  def perspective_finalize_export(params):
220
    """Expose the finalize export functionality.
221

    
222
    """
223
    instance = objects.Instance.FromDict(params[0])
224
    snap_disks = [objects.Disk.FromDict(str_data)
225
                  for str_data in params[1]]
226
    return backend.FinalizeExport(instance, snap_disks)
227

    
228
  @staticmethod
229
  def perspective_export_info(params):
230
    """Query information about an existing export on this node.
231

    
232
    The given path may not contain an export, in which case we return
233
    None.
234

    
235
    """
236
    path = params[0]
237
    einfo = backend.ExportInfo(path)
238
    if einfo is None:
239
      return einfo
240
    return einfo.Dumps()
241

    
242
  @staticmethod
243
  def perspective_export_list(params):
244
    """List the available exports on this node.
245

    
246
    Note that as opposed to export_info, which may query data about an
247
    export in any path, this only queries the standard Ganeti path
248
    (constants.EXPORT_DIR).
249

    
250
    """
251
    return backend.ListExports()
252

    
253
  @staticmethod
254
  def perspective_export_remove(params):
255
    """Remove an export.
256

    
257
    """
258
    export = params[0]
259
    return backend.RemoveExport(export)
260

    
261
  # volume  --------------------------
262

    
263
  @staticmethod
264
  def perspective_volume_list(params):
265
    """Query the list of logical volumes in a given volume group.
266

    
267
    """
268
    vgname = params[0]
269
    return backend.GetVolumeList(vgname)
270

    
271
  @staticmethod
272
  def perspective_vg_list(params):
273
    """Query the list of volume groups.
274

    
275
    """
276
    return backend.ListVolumeGroups()
277

    
278
  # bridge  --------------------------
279

    
280
  @staticmethod
281
  def perspective_bridges_exist(params):
282
    """Check if all bridges given exist on this node.
283

    
284
    """
285
    bridges_list = params[0]
286
    return backend.BridgesExist(bridges_list)
287

    
288
  # instance  --------------------------
289

    
290
  @staticmethod
291
  def perspective_instance_os_add(params):
292
    """Install an OS on a given instance.
293

    
294
    """
295
    inst_s, os_disk, swap_disk = params
296
    inst = objects.Instance.FromDict(inst_s)
297
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
298

    
299
  @staticmethod
300
  def perspective_instance_run_rename(params):
301
    """Runs the OS rename script for an instance.
302

    
303
    """
304
    inst_s, old_name, os_disk, swap_disk = params
305
    inst = objects.Instance.FromDict(inst_s)
306
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
307

    
308
  @staticmethod
309
  def perspective_instance_os_import(params):
310
    """Run the import function of an OS onto a given instance.
311

    
312
    """
313
    inst_s, os_disk, swap_disk, src_node, src_image = params
314
    inst = objects.Instance.FromDict(inst_s)
315
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
316
                                        src_node, src_image)
317

    
318
  @staticmethod
319
  def perspective_instance_shutdown(params):
320
    """Shutdown an instance.
321

    
322
    """
323
    instance = objects.Instance.FromDict(params[0])
324
    return backend.ShutdownInstance(instance)
325

    
326
  @staticmethod
327
  def perspective_instance_start(params):
328
    """Start an instance.
329

    
330
    """
331
    instance = objects.Instance.FromDict(params[0])
332
    extra_args = params[1]
333
    return backend.StartInstance(instance, extra_args)
334

    
335
  @staticmethod
336
  def perspective_instance_migrate(params):
337
    """Migrates an instance.
338

    
339
    """
340
    instance, target, live = params
341
    return backend.MigrateInstance(instance, target, live)
342

    
343
  @staticmethod
344
  def perspective_instance_reboot(params):
345
    """Reboot an instance.
346

    
347
    """
348
    instance = objects.Instance.FromDict(params[0])
349
    reboot_type = params[1]
350
    extra_args = params[2]
351
    return backend.RebootInstance(instance, reboot_type, extra_args)
352

    
353
  @staticmethod
354
  def perspective_instance_info(params):
355
    """Query instance information.
356

    
357
    """
358
    return backend.GetInstanceInfo(params[0])
359

    
360
  @staticmethod
361
  def perspective_all_instances_info(params):
362
    """Query information about all instances.
363

    
364
    """
365
    return backend.GetAllInstancesInfo()
366

    
367
  @staticmethod
368
  def perspective_instance_list(params):
369
    """Query the list of running instances.
370

    
371
    """
372
    return backend.GetInstanceList()
373

    
374
  # node --------------------------
375

    
376
  @staticmethod
377
  def perspective_node_tcp_ping(params):
378
    """Do a TcpPing on the remote node.
379

    
380
    """
381
    return utils.TcpPing(params[1], params[2], timeout=params[3],
382
                         live_port_needed=params[4], source=params[0])
383

    
384
  @staticmethod
385
  def perspective_node_info(params):
386
    """Query node information.
387

    
388
    """
389
    vgname = params[0]
390
    return backend.GetNodeInfo(vgname)
391

    
392
  @staticmethod
393
  def perspective_node_add(params):
394
    """Complete the registration of this node in the cluster.
395

    
396
    """
397
    return backend.AddNode(params[0], params[1], params[2],
398
                           params[3], params[4], params[5])
399

    
400
  @staticmethod
401
  def perspective_node_verify(params):
402
    """Run a verify sequence on this node.
403

    
404
    """
405
    return backend.VerifyNode(params[0])
406

    
407
  @staticmethod
408
  def perspective_node_start_master(params):
409
    """Promote this node to master status.
410

    
411
    """
412
    return backend.StartMaster()
413

    
414
  @staticmethod
415
  def perspective_node_stop_master(params):
416
    """Demote this node from master status.
417

    
418
    """
419
    return backend.StopMaster()
420

    
421
  @staticmethod
422
  def perspective_node_leave_cluster(params):
423
    """Cleanup after leaving a cluster.
424

    
425
    """
426
    return backend.LeaveCluster()
427

    
428
  @staticmethod
429
  def perspective_node_volumes(params):
430
    """Query the list of all logical volume groups.
431

    
432
    """
433
    return backend.NodeVolumes()
434

    
435
  # cluster --------------------------
436

    
437
  @staticmethod
438
  def perspective_version(params):
439
    """Query version information.
440

    
441
    """
442
    return constants.PROTOCOL_VERSION
443

    
444
  @staticmethod
445
  def perspective_upload_file(params):
446
    """Upload a file.
447

    
448
    Note that the backend implementation imposes strict rules on which
449
    files are accepted.
450

    
451
    """
452
    return backend.UploadFile(*params)
453

    
454

    
455
  # os -----------------------
456

    
457
  @staticmethod
458
  def perspective_os_diagnose(params):
459
    """Query detailed information about existing OSes.
460

    
461
    """
462
    return [os.ToDict() for os in backend.DiagnoseOS()]
463

    
464
  @staticmethod
465
  def perspective_os_get(params):
466
    """Query information about a given OS.
467

    
468
    """
469
    name = params[0]
470
    try:
471
      os_obj = backend.OSFromDisk(name)
472
    except errors.InvalidOS, err:
473
      os_obj = objects.OS.FromInvalidOS(err)
474
    return os_obj.ToDict()
475

    
476
  # hooks -----------------------
477

    
478
  @staticmethod
479
  def perspective_hooks_runner(params):
480
    """Run hook scripts.
481

    
482
    """
483
    hpath, phase, env = params
484
    hr = backend.HooksRunner()
485
    return hr.RunHooks(hpath, phase, env)
486

    
487
  # iallocator -----------------
488

    
489
  @staticmethod
490
  def perspective_iallocator_runner(params):
491
    """Run an iallocator script.
492

    
493
    """
494
    name, idata = params
495
    iar = backend.IAllocatorRunner()
496
    return iar.Run(name, idata)
497

    
498
  # test -----------------------
499

    
500
  @staticmethod
501
  def perspective_test_delay(params):
502
    """Run test delay.
503

    
504
    """
505
    duration = params[0]
506
    return utils.TestDelay(duration)
507

    
508
  @staticmethod
509
  def perspective_file_storage_dir_create(params):
510
    """Create the file storage directory.
511

    
512
    """
513
    file_storage_dir = params[0]
514
    return backend.CreateFileStorageDir(file_storage_dir)
515

    
516
  @staticmethod
517
  def perspective_file_storage_dir_remove(params):
518
    """Remove the file storage directory.
519

    
520
    """
521
    file_storage_dir = params[0]
522
    return backend.RemoveFileStorageDir(file_storage_dir)
523

    
524
  @staticmethod
525
  def perspective_file_storage_dir_rename(params):
526
    """Rename the file storage directory.
527

    
528
    """
529
    old_file_storage_dir = params[0]
530
    new_file_storage_dir = params[1]
531
    return backend.RenameFileStorageDir(old_file_storage_dir,
532
                                        new_file_storage_dir)
533

    
534

    
535
class NodeDaemonHttpServer(http.HTTPServer):
536
  def __init__(self, server_address):
537
    http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler)
538

    
539

    
540
def ParseOptions():
541
  """Parse the command line options.
542

    
543
  Returns:
544
    (options, args) as from OptionParser.parse_args()
545

    
546
  """
547
  parser = OptionParser(description="Ganeti node daemon",
548
                        usage="%prog [-f] [-d]",
549
                        version="%%prog (ganeti) %s" %
550
                        constants.RELEASE_VERSION)
551

    
552
  parser.add_option("-f", "--foreground", dest="fork",
553
                    help="Don't detach from the current terminal",
554
                    default=True, action="store_false")
555
  parser.add_option("-d", "--debug", dest="debug",
556
                    help="Enable some debug messages",
557
                    default=False, action="store_true")
558
  options, args = parser.parse_args()
559
  return options, args
560

    
561

    
562
def main():
563
  """Main function for the node daemon.
564

    
565
  """
566
  options, args = ParseOptions()
567
  utils.debug = options.debug
568
  for fname in (constants.SSL_CERT_FILE,):
569
    if not os.path.isfile(fname):
570
      print "config %s not there, will not run." % fname
571
      sys.exit(5)
572

    
573
  try:
574
    ss = ssconf.SimpleStore()
575
    port = ss.GetNodeDaemonPort()
576
    pwdata = ss.GetNodeDaemonPassword()
577
  except errors.ConfigurationError, err:
578
    print "Cluster configuration incomplete: '%s'" % str(err)
579
    sys.exit(5)
580

    
581
  # create the various SUB_RUN_DIRS, if not existing, so that we handle the
582
  # situation where RUN_DIR is tmpfs
583
  for dir_name in constants.SUB_RUN_DIRS:
584
    if not os.path.exists(dir_name):
585
      try:
586
        os.mkdir(dir_name, 0755)
587
      except EnvironmentError, err:
588
        if err.errno != errno.EEXIST:
589
          print ("Node setup wrong, cannot create directory %s: %s" %
590
                 (dir_name, err))
591
          sys.exit(5)
592
    if not os.path.isdir(dir_name):
593
      print ("Node setup wrong, %s is not a directory" % dir_name)
594
      sys.exit(5)
595

    
596
  # become a daemon
597
  if options.fork:
598
    utils.Daemonize(logfile=constants.LOG_NODESERVER)
599

    
600
  logger.SetupDaemon(logfile=constants.LOG_NODESERVER, debug=options.debug,
601
                     stderr_logging=not options.fork)
602
  logging.info("ganeti node daemon startup")
603

    
604
  global _EXIT_GANETI_NODED
605

    
606
  httpd = NodeDaemonHttpServer(('', port))
607
  while (not _EXIT_GANETI_NODED):
608
    httpd.handle_request()
609

    
610

    
611
if __name__ == '__main__':
612
  main()