Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 99e88451

History | View | Annotate | Download (16.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import SocketServer
31
import errno
32
import logging
33
import signal
34

    
35
from optparse import OptionParser
36

    
37
from ganeti import backend
38
from ganeti import logger
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import errors
42
from ganeti import ssconf
43
from ganeti import http
44
from ganeti import utils
45

    
46

    
47
class NodeDaemonRequestHandler(http.HTTPRequestHandler):
48
  """The server implementation.
49

    
50
  This class holds all methods exposed over the RPC interface.
51

    
52
  """
53
  def HandleRequest(self):
54
    """Handle a request.
55

    
56
    """
57
    if self.command.upper() != "PUT":
58
      raise http.HTTPBadRequest()
59

    
60
    path = self.path
61
    if path.startswith("/"):
62
      path = path[1:]
63

    
64
    method = getattr(self, "perspective_%s" % path, None)
65
    if method is None:
66
      raise httperror.HTTPNotFound()
67

    
68
    try:
69
      return method(self.post_data)
70
    except errors.QuitGanetiException, err:
71
      # Tell parent to quit
72
      os.kill(self.server.noded_pid, signal.SIGTERM)
73

    
74
  # the new block devices  --------------------------
75

    
76
  @staticmethod
77
  def perspective_blockdev_create(params):
78
    """Create a block device.
79

    
80
    """
81
    bdev_s, size, owner, on_primary, info = params
82
    bdev = objects.Disk.FromDict(bdev_s)
83
    if bdev is None:
84
      raise ValueError("can't unserialize data!")
85
    return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
86

    
87
  @staticmethod
88
  def perspective_blockdev_remove(params):
89
    """Remove a block device.
90

    
91
    """
92
    bdev_s = params[0]
93
    bdev = objects.Disk.FromDict(bdev_s)
94
    return backend.RemoveBlockDevice(bdev)
95

    
96
  @staticmethod
97
  def perspective_blockdev_rename(params):
98
    """Remove a block device.
99

    
100
    """
101
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
102
    return backend.RenameBlockDevices(devlist)
103

    
104
  @staticmethod
105
  def perspective_blockdev_assemble(params):
106
    """Assemble a block device.
107

    
108
    """
109
    bdev_s, owner, on_primary = params
110
    bdev = objects.Disk.FromDict(bdev_s)
111
    if bdev is None:
112
      raise ValueError("can't unserialize data!")
113
    return backend.AssembleBlockDevice(bdev, owner, on_primary)
114

    
115
  @staticmethod
116
  def perspective_blockdev_shutdown(params):
117
    """Shutdown a block device.
118

    
119
    """
120
    bdev_s = params[0]
121
    bdev = objects.Disk.FromDict(bdev_s)
122
    if bdev is None:
123
      raise ValueError("can't unserialize data!")
124
    return backend.ShutdownBlockDevice(bdev)
125

    
126
  @staticmethod
127
  def perspective_blockdev_addchildren(params):
128
    """Add a child to a mirror device.
129

    
130
    Note: this is only valid for mirror devices. It's the caller's duty
131
    to send a correct disk, otherwise we raise an error.
132

    
133
    """
134
    bdev_s, ndev_s = params
135
    bdev = objects.Disk.FromDict(bdev_s)
136
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
137
    if bdev is None or ndevs.count(None) > 0:
138
      raise ValueError("can't unserialize data!")
139
    return backend.MirrorAddChildren(bdev, ndevs)
140

    
141
  @staticmethod
142
  def perspective_blockdev_removechildren(params):
143
    """Remove a child from a mirror device.
144

    
145
    This is only valid for mirror devices, of course. It's the callers
146
    duty to send a correct disk, otherwise we raise an error.
147

    
148
    """
149
    bdev_s, ndev_s = params
150
    bdev = objects.Disk.FromDict(bdev_s)
151
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
152
    if bdev is None or ndevs.count(None) > 0:
153
      raise ValueError("can't unserialize data!")
154
    return backend.MirrorRemoveChildren(bdev, ndevs)
155

    
156
  @staticmethod
157
  def perspective_blockdev_getmirrorstatus(params):
158
    """Return the mirror status for a list of disks.
159

    
160
    """
161
    disks = [objects.Disk.FromDict(dsk_s)
162
            for dsk_s in params]
163
    return backend.GetMirrorStatus(disks)
164

    
165
  @staticmethod
166
  def perspective_blockdev_find(params):
167
    """Expose the FindBlockDevice functionality for a disk.
168

    
169
    This will try to find but not activate a disk.
170

    
171
    """
172
    disk = objects.Disk.FromDict(params[0])
173
    return backend.FindBlockDevice(disk)
174

    
175
  @staticmethod
176
  def perspective_blockdev_snapshot(params):
177
    """Create a snapshot device.
178

    
179
    Note that this is only valid for LVM disks, if we get passed
180
    something else we raise an exception. The snapshot device can be
181
    remove by calling the generic block device remove call.
182

    
183
    """
184
    cfbd = objects.Disk.FromDict(params[0])
185
    return backend.SnapshotBlockDevice(cfbd)
186

    
187
  @staticmethod
188
  def perspective_blockdev_grow(params):
189
    """Grow a stack of devices.
190

    
191
    """
192
    cfbd = objects.Disk.FromDict(params[0])
193
    amount = params[1]
194
    return backend.GrowBlockDevice(cfbd, amount)
195

    
196
  @staticmethod
197
  def perspective_blockdev_close(params):
198
    """Closes the given block devices.
199

    
200
    """
201
    disks = [objects.Disk.FromDict(cf) for cf in params]
202
    return backend.CloseBlockDevices(disks)
203

    
204
  # export/import  --------------------------
205

    
206
  @staticmethod
207
  def perspective_snapshot_export(params):
208
    """Export a given snapshot.
209

    
210
    """
211
    disk = objects.Disk.FromDict(params[0])
212
    dest_node = params[1]
213
    instance = objects.Instance.FromDict(params[2])
214
    return backend.ExportSnapshot(disk, dest_node, instance)
215

    
216
  @staticmethod
217
  def perspective_finalize_export(params):
218
    """Expose the finalize export functionality.
219

    
220
    """
221
    instance = objects.Instance.FromDict(params[0])
222
    snap_disks = [objects.Disk.FromDict(str_data)
223
                  for str_data in params[1]]
224
    return backend.FinalizeExport(instance, snap_disks)
225

    
226
  @staticmethod
227
  def perspective_export_info(params):
228
    """Query information about an existing export on this node.
229

    
230
    The given path may not contain an export, in which case we return
231
    None.
232

    
233
    """
234
    path = params[0]
235
    einfo = backend.ExportInfo(path)
236
    if einfo is None:
237
      return einfo
238
    return einfo.Dumps()
239

    
240
  @staticmethod
241
  def perspective_export_list(params):
242
    """List the available exports on this node.
243

    
244
    Note that as opposed to export_info, which may query data about an
245
    export in any path, this only queries the standard Ganeti path
246
    (constants.EXPORT_DIR).
247

    
248
    """
249
    return backend.ListExports()
250

    
251
  @staticmethod
252
  def perspective_export_remove(params):
253
    """Remove an export.
254

    
255
    """
256
    export = params[0]
257
    return backend.RemoveExport(export)
258

    
259
  # volume  --------------------------
260

    
261
  @staticmethod
262
  def perspective_volume_list(params):
263
    """Query the list of logical volumes in a given volume group.
264

    
265
    """
266
    vgname = params[0]
267
    return backend.GetVolumeList(vgname)
268

    
269
  @staticmethod
270
  def perspective_vg_list(params):
271
    """Query the list of volume groups.
272

    
273
    """
274
    return backend.ListVolumeGroups()
275

    
276
  # bridge  --------------------------
277

    
278
  @staticmethod
279
  def perspective_bridges_exist(params):
280
    """Check if all bridges given exist on this node.
281

    
282
    """
283
    bridges_list = params[0]
284
    return backend.BridgesExist(bridges_list)
285

    
286
  # instance  --------------------------
287

    
288
  @staticmethod
289
  def perspective_instance_os_add(params):
290
    """Install an OS on a given instance.
291

    
292
    """
293
    inst_s, os_disk, swap_disk = params
294
    inst = objects.Instance.FromDict(inst_s)
295
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
296

    
297
  @staticmethod
298
  def perspective_instance_run_rename(params):
299
    """Runs the OS rename script for an instance.
300

    
301
    """
302
    inst_s, old_name, os_disk, swap_disk = params
303
    inst = objects.Instance.FromDict(inst_s)
304
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
305

    
306
  @staticmethod
307
  def perspective_instance_os_import(params):
308
    """Run the import function of an OS onto a given instance.
309

    
310
    """
311
    inst_s, os_disk, swap_disk, src_node, src_image = params
312
    inst = objects.Instance.FromDict(inst_s)
313
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
314
                                        src_node, src_image)
315

    
316
  @staticmethod
317
  def perspective_instance_shutdown(params):
318
    """Shutdown an instance.
319

    
320
    """
321
    instance = objects.Instance.FromDict(params[0])
322
    return backend.ShutdownInstance(instance)
323

    
324
  @staticmethod
325
  def perspective_instance_start(params):
326
    """Start an instance.
327

    
328
    """
329
    instance = objects.Instance.FromDict(params[0])
330
    extra_args = params[1]
331
    return backend.StartInstance(instance, extra_args)
332

    
333
  @staticmethod
334
  def perspective_instance_migrate(params):
335
    """Migrates an instance.
336

    
337
    """
338
    instance, target, live = params
339
    return backend.MigrateInstance(instance, target, live)
340

    
341
  @staticmethod
342
  def perspective_instance_reboot(params):
343
    """Reboot an instance.
344

    
345
    """
346
    instance = objects.Instance.FromDict(params[0])
347
    reboot_type = params[1]
348
    extra_args = params[2]
349
    return backend.RebootInstance(instance, reboot_type, extra_args)
350

    
351
  @staticmethod
352
  def perspective_instance_info(params):
353
    """Query instance information.
354

    
355
    """
356
    return backend.GetInstanceInfo(params[0])
357

    
358
  @staticmethod
359
  def perspective_all_instances_info(params):
360
    """Query information about all instances.
361

    
362
    """
363
    return backend.GetAllInstancesInfo()
364

    
365
  @staticmethod
366
  def perspective_instance_list(params):
367
    """Query the list of running instances.
368

    
369
    """
370
    return backend.GetInstanceList()
371

    
372
  # node --------------------------
373

    
374
  @staticmethod
375
  def perspective_node_tcp_ping(params):
376
    """Do a TcpPing on the remote node.
377

    
378
    """
379
    return utils.TcpPing(params[1], params[2], timeout=params[3],
380
                         live_port_needed=params[4], source=params[0])
381

    
382
  @staticmethod
383
  def perspective_node_info(params):
384
    """Query node information.
385

    
386
    """
387
    vgname = params[0]
388
    return backend.GetNodeInfo(vgname)
389

    
390
  @staticmethod
391
  def perspective_node_add(params):
392
    """Complete the registration of this node in the cluster.
393

    
394
    """
395
    return backend.AddNode(params[0], params[1], params[2],
396
                           params[3], params[4], params[5])
397

    
398
  @staticmethod
399
  def perspective_node_verify(params):
400
    """Run a verify sequence on this node.
401

    
402
    """
403
    return backend.VerifyNode(params[0])
404

    
405
  @staticmethod
406
  def perspective_node_start_master(params):
407
    """Promote this node to master status.
408

    
409
    """
410
    return backend.StartMaster()
411

    
412
  @staticmethod
413
  def perspective_node_stop_master(params):
414
    """Demote this node from master status.
415

    
416
    """
417
    return backend.StopMaster()
418

    
419
  @staticmethod
420
  def perspective_node_leave_cluster(params):
421
    """Cleanup after leaving a cluster.
422

    
423
    """
424
    return backend.LeaveCluster()
425

    
426
  @staticmethod
427
  def perspective_node_volumes(params):
428
    """Query the list of all logical volume groups.
429

    
430
    """
431
    return backend.NodeVolumes()
432

    
433
  # cluster --------------------------
434

    
435
  @staticmethod
436
  def perspective_version(params):
437
    """Query version information.
438

    
439
    """
440
    return constants.PROTOCOL_VERSION
441

    
442
  @staticmethod
443
  def perspective_upload_file(params):
444
    """Upload a file.
445

    
446
    Note that the backend implementation imposes strict rules on which
447
    files are accepted.
448

    
449
    """
450
    return backend.UploadFile(*params)
451

    
452

    
453
  # os -----------------------
454

    
455
  @staticmethod
456
  def perspective_os_diagnose(params):
457
    """Query detailed information about existing OSes.
458

    
459
    """
460
    return [os.ToDict() for os in backend.DiagnoseOS()]
461

    
462
  @staticmethod
463
  def perspective_os_get(params):
464
    """Query information about a given OS.
465

    
466
    """
467
    name = params[0]
468
    try:
469
      os_obj = backend.OSFromDisk(name)
470
    except errors.InvalidOS, err:
471
      os_obj = objects.OS.FromInvalidOS(err)
472
    return os_obj.ToDict()
473

    
474
  # hooks -----------------------
475

    
476
  @staticmethod
477
  def perspective_hooks_runner(params):
478
    """Run hook scripts.
479

    
480
    """
481
    hpath, phase, env = params
482
    hr = backend.HooksRunner()
483
    return hr.RunHooks(hpath, phase, env)
484

    
485
  # iallocator -----------------
486

    
487
  @staticmethod
488
  def perspective_iallocator_runner(params):
489
    """Run an iallocator script.
490

    
491
    """
492
    name, idata = params
493
    iar = backend.IAllocatorRunner()
494
    return iar.Run(name, idata)
495

    
496
  # test -----------------------
497

    
498
  @staticmethod
499
  def perspective_test_delay(params):
500
    """Run test delay.
501

    
502
    """
503
    duration = params[0]
504
    return utils.TestDelay(duration)
505

    
506
  @staticmethod
507
  def perspective_file_storage_dir_create(params):
508
    """Create the file storage directory.
509

    
510
    """
511
    file_storage_dir = params[0]
512
    return backend.CreateFileStorageDir(file_storage_dir)
513

    
514
  @staticmethod
515
  def perspective_file_storage_dir_remove(params):
516
    """Remove the file storage directory.
517

    
518
    """
519
    file_storage_dir = params[0]
520
    return backend.RemoveFileStorageDir(file_storage_dir)
521

    
522
  @staticmethod
523
  def perspective_file_storage_dir_rename(params):
524
    """Rename the file storage directory.
525

    
526
    """
527
    old_file_storage_dir = params[0]
528
    new_file_storage_dir = params[1]
529
    return backend.RenameFileStorageDir(old_file_storage_dir,
530
                                        new_file_storage_dir)
531

    
532

    
533
class NodeDaemonHttpServer(http.HTTPServer):
534
  def __init__(self, server_address):
535
    http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler)
536
    self.noded_pid = os.getpid()
537

    
538
  def serve_forever(self):
539
    """Handle requests until told to quit."""
540
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
541
    try:
542
      while not sighandler.called:
543
        self.handle_request()
544
      # TODO: There could be children running at this point
545
    finally:
546
      sighandler.Reset()
547

    
548

    
549
class ForkingHTTPServer(SocketServer.ForkingMixIn, NodeDaemonHttpServer):
550
  """Forking HTTP Server.
551

    
552
  This inherits from ForkingMixIn and HTTPServer in order to fork for each
553
  request we handle. This allows more requests to be handled concurrently.
554

    
555
  """
556

    
557

    
558
def ParseOptions():
559
  """Parse the command line options.
560

    
561
  Returns:
562
    (options, args) as from OptionParser.parse_args()
563

    
564
  """
565
  parser = OptionParser(description="Ganeti node daemon",
566
                        usage="%prog [-f] [-d]",
567
                        version="%%prog (ganeti) %s" %
568
                        constants.RELEASE_VERSION)
569

    
570
  parser.add_option("-f", "--foreground", dest="fork",
571
                    help="Don't detach from the current terminal",
572
                    default=True, action="store_false")
573
  parser.add_option("-d", "--debug", dest="debug",
574
                    help="Enable some debug messages",
575
                    default=False, action="store_true")
576
  options, args = parser.parse_args()
577
  return options, args
578

    
579

    
580
def main():
581
  """Main function for the node daemon.
582

    
583
  """
584
  options, args = ParseOptions()
585
  utils.debug = options.debug
586
  for fname in (constants.SSL_CERT_FILE,):
587
    if not os.path.isfile(fname):
588
      print "config %s not there, will not run." % fname
589
      sys.exit(5)
590

    
591
  try:
592
    ss = ssconf.SimpleStore()
593
    port = ss.GetNodeDaemonPort()
594
    pwdata = ss.GetNodeDaemonPassword()
595
  except errors.ConfigurationError, err:
596
    print "Cluster configuration incomplete: '%s'" % str(err)
597
    sys.exit(5)
598

    
599
  # create the various SUB_RUN_DIRS, if not existing, so that we handle the
600
  # situation where RUN_DIR is tmpfs
601
  for dir_name in constants.SUB_RUN_DIRS:
602
    if not os.path.exists(dir_name):
603
      try:
604
        os.mkdir(dir_name, 0755)
605
      except EnvironmentError, err:
606
        if err.errno != errno.EEXIST:
607
          print ("Node setup wrong, cannot create directory %s: %s" %
608
                 (dir_name, err))
609
          sys.exit(5)
610
    if not os.path.isdir(dir_name):
611
      print ("Node setup wrong, %s is not a directory" % dir_name)
612
      sys.exit(5)
613

    
614
  # become a daemon
615
  if options.fork:
616
    utils.Daemonize(logfile=constants.LOG_NODESERVER)
617

    
618
  utils.WritePidFile(constants.NODED_PID)
619

    
620
  logger.SetupDaemon(logfile=constants.LOG_NODESERVER, debug=options.debug,
621
                     stderr_logging=not options.fork)
622
  logging.info("ganeti node daemon startup")
623

    
624
  if options.fork:
625
    server = ForkingHTTPServer(('', port))
626
  else:
627
    server = NodeDaemonHttpServer(('', port))
628

    
629
  try:
630
    server.serve_forever()
631
  finally:
632
    utils.RemovePidFile(constants.NODED_PID)
633

    
634

    
635
if __name__ == '__main__':
636
  main()