Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ af5ebcb1

History | View | Annotate | Download (17.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import SocketServer
31
import errno
32
import logging
33
import signal
34

    
35
from optparse import OptionParser
36

    
37
from ganeti import backend
38
from ganeti import logger
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import errors
42
from ganeti import jstore
43
from ganeti import ssconf
44
from ganeti import http
45
from ganeti import utils
46

    
47

    
48
queue_lock = None
49

    
50

    
51
def _RequireJobQueueLock(fn):
52
  """Decorator for job queue manipulating functions.
53

    
54
  """
55
  def wrapper(*args, **kwargs):
56
    # Locking in exclusive, blocking mode because there could be several
57
    # children running at the same time.
58
    # TODO: Implement nonblocking locking with retries?
59
    queue_lock.Exclusive(blocking=True)
60
    try:
61
      return fn(*args, **kwargs)
62
    finally:
63
      queue_lock.Unlock()
64
  return wrapper
65

    
66

    
67
class NodeDaemonRequestHandler(http.HTTPRequestHandler):
68
  """The server implementation.
69

    
70
  This class holds all methods exposed over the RPC interface.
71

    
72
  """
73
  def HandleRequest(self):
74
    """Handle a request.
75

    
76
    """
77
    if self.command.upper() != "PUT":
78
      raise http.HTTPBadRequest()
79

    
80
    path = self.path
81
    if path.startswith("/"):
82
      path = path[1:]
83

    
84
    method = getattr(self, "perspective_%s" % path, None)
85
    if method is None:
86
      raise httperror.HTTPNotFound()
87

    
88
    try:
89
      try:
90
        return method(self.post_data)
91
      except:
92
        logging.exception("Error in RPC call")
93
        raise
94
    except errors.QuitGanetiException, err:
95
      # Tell parent to quit
96
      os.kill(self.server.noded_pid, signal.SIGTERM)
97

    
98
  # the new block devices  --------------------------
99

    
100
  @staticmethod
101
  def perspective_blockdev_create(params):
102
    """Create a block device.
103

    
104
    """
105
    bdev_s, size, owner, on_primary, info = params
106
    bdev = objects.Disk.FromDict(bdev_s)
107
    if bdev is None:
108
      raise ValueError("can't unserialize data!")
109
    return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
110

    
111
  @staticmethod
112
  def perspective_blockdev_remove(params):
113
    """Remove a block device.
114

    
115
    """
116
    bdev_s = params[0]
117
    bdev = objects.Disk.FromDict(bdev_s)
118
    return backend.RemoveBlockDevice(bdev)
119

    
120
  @staticmethod
121
  def perspective_blockdev_rename(params):
122
    """Remove a block device.
123

    
124
    """
125
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
126
    return backend.RenameBlockDevices(devlist)
127

    
128
  @staticmethod
129
  def perspective_blockdev_assemble(params):
130
    """Assemble a block device.
131

    
132
    """
133
    bdev_s, owner, on_primary = params
134
    bdev = objects.Disk.FromDict(bdev_s)
135
    if bdev is None:
136
      raise ValueError("can't unserialize data!")
137
    return backend.AssembleBlockDevice(bdev, owner, on_primary)
138

    
139
  @staticmethod
140
  def perspective_blockdev_shutdown(params):
141
    """Shutdown a block device.
142

    
143
    """
144
    bdev_s = params[0]
145
    bdev = objects.Disk.FromDict(bdev_s)
146
    if bdev is None:
147
      raise ValueError("can't unserialize data!")
148
    return backend.ShutdownBlockDevice(bdev)
149

    
150
  @staticmethod
151
  def perspective_blockdev_addchildren(params):
152
    """Add a child to a mirror device.
153

    
154
    Note: this is only valid for mirror devices. It's the caller's duty
155
    to send a correct disk, otherwise we raise an error.
156

    
157
    """
158
    bdev_s, ndev_s = params
159
    bdev = objects.Disk.FromDict(bdev_s)
160
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
161
    if bdev is None or ndevs.count(None) > 0:
162
      raise ValueError("can't unserialize data!")
163
    return backend.MirrorAddChildren(bdev, ndevs)
164

    
165
  @staticmethod
166
  def perspective_blockdev_removechildren(params):
167
    """Remove a child from a mirror device.
168

    
169
    This is only valid for mirror devices, of course. It's the callers
170
    duty to send a correct disk, otherwise we raise an error.
171

    
172
    """
173
    bdev_s, ndev_s = params
174
    bdev = objects.Disk.FromDict(bdev_s)
175
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
176
    if bdev is None or ndevs.count(None) > 0:
177
      raise ValueError("can't unserialize data!")
178
    return backend.MirrorRemoveChildren(bdev, ndevs)
179

    
180
  @staticmethod
181
  def perspective_blockdev_getmirrorstatus(params):
182
    """Return the mirror status for a list of disks.
183

    
184
    """
185
    disks = [objects.Disk.FromDict(dsk_s)
186
            for dsk_s in params]
187
    return backend.GetMirrorStatus(disks)
188

    
189
  @staticmethod
190
  def perspective_blockdev_find(params):
191
    """Expose the FindBlockDevice functionality for a disk.
192

    
193
    This will try to find but not activate a disk.
194

    
195
    """
196
    disk = objects.Disk.FromDict(params[0])
197
    return backend.FindBlockDevice(disk)
198

    
199
  @staticmethod
200
  def perspective_blockdev_snapshot(params):
201
    """Create a snapshot device.
202

    
203
    Note that this is only valid for LVM disks, if we get passed
204
    something else we raise an exception. The snapshot device can be
205
    remove by calling the generic block device remove call.
206

    
207
    """
208
    cfbd = objects.Disk.FromDict(params[0])
209
    return backend.SnapshotBlockDevice(cfbd)
210

    
211
  @staticmethod
212
  def perspective_blockdev_grow(params):
213
    """Grow a stack of devices.
214

    
215
    """
216
    cfbd = objects.Disk.FromDict(params[0])
217
    amount = params[1]
218
    return backend.GrowBlockDevice(cfbd, amount)
219

    
220
  @staticmethod
221
  def perspective_blockdev_close(params):
222
    """Closes the given block devices.
223

    
224
    """
225
    disks = [objects.Disk.FromDict(cf) for cf in params]
226
    return backend.CloseBlockDevices(disks)
227

    
228
  # export/import  --------------------------
229

    
230
  @staticmethod
231
  def perspective_snapshot_export(params):
232
    """Export a given snapshot.
233

    
234
    """
235
    disk = objects.Disk.FromDict(params[0])
236
    dest_node = params[1]
237
    instance = objects.Instance.FromDict(params[2])
238
    return backend.ExportSnapshot(disk, dest_node, instance)
239

    
240
  @staticmethod
241
  def perspective_finalize_export(params):
242
    """Expose the finalize export functionality.
243

    
244
    """
245
    instance = objects.Instance.FromDict(params[0])
246
    snap_disks = [objects.Disk.FromDict(str_data)
247
                  for str_data in params[1]]
248
    return backend.FinalizeExport(instance, snap_disks)
249

    
250
  @staticmethod
251
  def perspective_export_info(params):
252
    """Query information about an existing export on this node.
253

    
254
    The given path may not contain an export, in which case we return
255
    None.
256

    
257
    """
258
    path = params[0]
259
    einfo = backend.ExportInfo(path)
260
    if einfo is None:
261
      return einfo
262
    return einfo.Dumps()
263

    
264
  @staticmethod
265
  def perspective_export_list(params):
266
    """List the available exports on this node.
267

    
268
    Note that as opposed to export_info, which may query data about an
269
    export in any path, this only queries the standard Ganeti path
270
    (constants.EXPORT_DIR).
271

    
272
    """
273
    return backend.ListExports()
274

    
275
  @staticmethod
276
  def perspective_export_remove(params):
277
    """Remove an export.
278

    
279
    """
280
    export = params[0]
281
    return backend.RemoveExport(export)
282

    
283
  # volume  --------------------------
284

    
285
  @staticmethod
286
  def perspective_volume_list(params):
287
    """Query the list of logical volumes in a given volume group.
288

    
289
    """
290
    vgname = params[0]
291
    return backend.GetVolumeList(vgname)
292

    
293
  @staticmethod
294
  def perspective_vg_list(params):
295
    """Query the list of volume groups.
296

    
297
    """
298
    return backend.ListVolumeGroups()
299

    
300
  # bridge  --------------------------
301

    
302
  @staticmethod
303
  def perspective_bridges_exist(params):
304
    """Check if all bridges given exist on this node.
305

    
306
    """
307
    bridges_list = params[0]
308
    return backend.BridgesExist(bridges_list)
309

    
310
  # instance  --------------------------
311

    
312
  @staticmethod
313
  def perspective_instance_os_add(params):
314
    """Install an OS on a given instance.
315

    
316
    """
317
    inst_s, os_disk, swap_disk = params
318
    inst = objects.Instance.FromDict(inst_s)
319
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
320

    
321
  @staticmethod
322
  def perspective_instance_run_rename(params):
323
    """Runs the OS rename script for an instance.
324

    
325
    """
326
    inst_s, old_name, os_disk, swap_disk = params
327
    inst = objects.Instance.FromDict(inst_s)
328
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
329

    
330
  @staticmethod
331
  def perspective_instance_os_import(params):
332
    """Run the import function of an OS onto a given instance.
333

    
334
    """
335
    inst_s, os_disk, swap_disk, src_node, src_image = params
336
    inst = objects.Instance.FromDict(inst_s)
337
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
338
                                        src_node, src_image)
339

    
340
  @staticmethod
341
  def perspective_instance_shutdown(params):
342
    """Shutdown an instance.
343

    
344
    """
345
    instance = objects.Instance.FromDict(params[0])
346
    return backend.ShutdownInstance(instance)
347

    
348
  @staticmethod
349
  def perspective_instance_start(params):
350
    """Start an instance.
351

    
352
    """
353
    instance = objects.Instance.FromDict(params[0])
354
    extra_args = params[1]
355
    return backend.StartInstance(instance, extra_args)
356

    
357
  @staticmethod
358
  def perspective_instance_migrate(params):
359
    """Migrates an instance.
360

    
361
    """
362
    instance, target, live = params
363
    return backend.MigrateInstance(instance, target, live)
364

    
365
  @staticmethod
366
  def perspective_instance_reboot(params):
367
    """Reboot an instance.
368

    
369
    """
370
    instance = objects.Instance.FromDict(params[0])
371
    reboot_type = params[1]
372
    extra_args = params[2]
373
    return backend.RebootInstance(instance, reboot_type, extra_args)
374

    
375
  @staticmethod
376
  def perspective_instance_info(params):
377
    """Query instance information.
378

    
379
    """
380
    return backend.GetInstanceInfo(params[0])
381

    
382
  @staticmethod
383
  def perspective_all_instances_info(params):
384
    """Query information about all instances.
385

    
386
    """
387
    return backend.GetAllInstancesInfo()
388

    
389
  @staticmethod
390
  def perspective_instance_list(params):
391
    """Query the list of running instances.
392

    
393
    """
394
    return backend.GetInstanceList()
395

    
396
  # node --------------------------
397

    
398
  @staticmethod
399
  def perspective_node_tcp_ping(params):
400
    """Do a TcpPing on the remote node.
401

    
402
    """
403
    return utils.TcpPing(params[1], params[2], timeout=params[3],
404
                         live_port_needed=params[4], source=params[0])
405

    
406
  @staticmethod
407
  def perspective_node_info(params):
408
    """Query node information.
409

    
410
    """
411
    vgname = params[0]
412
    return backend.GetNodeInfo(vgname)
413

    
414
  @staticmethod
415
  def perspective_node_add(params):
416
    """Complete the registration of this node in the cluster.
417

    
418
    """
419
    return backend.AddNode(params[0], params[1], params[2],
420
                           params[3], params[4], params[5])
421

    
422
  @staticmethod
423
  def perspective_node_verify(params):
424
    """Run a verify sequence on this node.
425

    
426
    """
427
    return backend.VerifyNode(params[0])
428

    
429
  @staticmethod
430
  def perspective_node_start_master(params):
431
    """Promote this node to master status.
432

    
433
    """
434
    return backend.StartMaster(params[0])
435

    
436
  @staticmethod
437
  def perspective_node_stop_master(params):
438
    """Demote this node from master status.
439

    
440
    """
441
    return backend.StopMaster(params[0])
442

    
443
  @staticmethod
444
  def perspective_node_leave_cluster(params):
445
    """Cleanup after leaving a cluster.
446

    
447
    """
448
    return backend.LeaveCluster()
449

    
450
  @staticmethod
451
  def perspective_node_volumes(params):
452
    """Query the list of all logical volume groups.
453

    
454
    """
455
    return backend.NodeVolumes()
456

    
457
  # cluster --------------------------
458

    
459
  @staticmethod
460
  def perspective_version(params):
461
    """Query version information.
462

    
463
    """
464
    return constants.PROTOCOL_VERSION
465

    
466
  @staticmethod
467
  def perspective_upload_file(params):
468
    """Upload a file.
469

    
470
    Note that the backend implementation imposes strict rules on which
471
    files are accepted.
472

    
473
    """
474
    return backend.UploadFile(*params)
475

    
476

    
477
  # os -----------------------
478

    
479
  @staticmethod
480
  def perspective_os_diagnose(params):
481
    """Query detailed information about existing OSes.
482

    
483
    """
484
    return [os.ToDict() for os in backend.DiagnoseOS()]
485

    
486
  @staticmethod
487
  def perspective_os_get(params):
488
    """Query information about a given OS.
489

    
490
    """
491
    name = params[0]
492
    try:
493
      os_obj = backend.OSFromDisk(name)
494
    except errors.InvalidOS, err:
495
      os_obj = objects.OS.FromInvalidOS(err)
496
    return os_obj.ToDict()
497

    
498
  # hooks -----------------------
499

    
500
  @staticmethod
501
  def perspective_hooks_runner(params):
502
    """Run hook scripts.
503

    
504
    """
505
    hpath, phase, env = params
506
    hr = backend.HooksRunner()
507
    return hr.RunHooks(hpath, phase, env)
508

    
509
  # iallocator -----------------
510

    
511
  @staticmethod
512
  def perspective_iallocator_runner(params):
513
    """Run an iallocator script.
514

    
515
    """
516
    name, idata = params
517
    iar = backend.IAllocatorRunner()
518
    return iar.Run(name, idata)
519

    
520
  # test -----------------------
521

    
522
  @staticmethod
523
  def perspective_test_delay(params):
524
    """Run test delay.
525

    
526
    """
527
    duration = params[0]
528
    return utils.TestDelay(duration)
529

    
530
  @staticmethod
531
  def perspective_file_storage_dir_create(params):
532
    """Create the file storage directory.
533

    
534
    """
535
    file_storage_dir = params[0]
536
    return backend.CreateFileStorageDir(file_storage_dir)
537

    
538
  @staticmethod
539
  def perspective_file_storage_dir_remove(params):
540
    """Remove the file storage directory.
541

    
542
    """
543
    file_storage_dir = params[0]
544
    return backend.RemoveFileStorageDir(file_storage_dir)
545

    
546
  @staticmethod
547
  def perspective_file_storage_dir_rename(params):
548
    """Rename the file storage directory.
549

    
550
    """
551
    old_file_storage_dir = params[0]
552
    new_file_storage_dir = params[1]
553
    return backend.RenameFileStorageDir(old_file_storage_dir,
554
                                        new_file_storage_dir)
555

    
556
  @staticmethod
557
  @_RequireJobQueueLock
558
  def perspective_jobqueue_update(params):
559
    """Update job queue.
560

    
561
    """
562
    (file_name, content) = params
563
    return backend.JobQueueUpdate(file_name, content)
564

    
565
  @staticmethod
566
  def perspective_jobqueue_purge(params):
567
    """Purge job queue.
568

    
569
    """
570
    return backend.JobQueuePurge()
571

    
572
  @staticmethod
573
  @_RequireJobQueueLock
574
  def perspective_jobqueue_rename(params):
575
    """Rename a job queue file.
576

    
577
    """
578
    (old, new) = params
579

    
580
    return backend.JobQueueRename(old, new)
581

    
582

    
583
class NodeDaemonHttpServer(http.HTTPServer):
584
  def __init__(self, server_address):
585
    http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler)
586
    self.noded_pid = os.getpid()
587

    
588
  def serve_forever(self):
589
    """Handle requests until told to quit."""
590
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
591
    try:
592
      while not sighandler.called:
593
        self.handle_request()
594
      # TODO: There could be children running at this point
595
    finally:
596
      sighandler.Reset()
597

    
598

    
599
class ForkingHTTPServer(SocketServer.ForkingMixIn, NodeDaemonHttpServer):
600
  """Forking HTTP Server.
601

    
602
  This inherits from ForkingMixIn and HTTPServer in order to fork for each
603
  request we handle. This allows more requests to be handled concurrently.
604

    
605
  """
606

    
607

    
608
def ParseOptions():
609
  """Parse the command line options.
610

    
611
  Returns:
612
    (options, args) as from OptionParser.parse_args()
613

    
614
  """
615
  parser = OptionParser(description="Ganeti node daemon",
616
                        usage="%prog [-f] [-d]",
617
                        version="%%prog (ganeti) %s" %
618
                        constants.RELEASE_VERSION)
619

    
620
  parser.add_option("-f", "--foreground", dest="fork",
621
                    help="Don't detach from the current terminal",
622
                    default=True, action="store_false")
623
  parser.add_option("-d", "--debug", dest="debug",
624
                    help="Enable some debug messages",
625
                    default=False, action="store_true")
626
  options, args = parser.parse_args()
627
  return options, args
628

    
629

    
630
def main():
631
  """Main function for the node daemon.
632

    
633
  """
634
  global queue_lock
635

    
636
  options, args = ParseOptions()
637
  utils.debug = options.debug
638
  for fname in (constants.SSL_CERT_FILE,):
639
    if not os.path.isfile(fname):
640
      print "config %s not there, will not run." % fname
641
      sys.exit(5)
642

    
643
  try:
644
    ss = ssconf.SimpleStore()
645
    port = ss.GetNodeDaemonPort()
646
    pwdata = ss.GetNodeDaemonPassword()
647
  except errors.ConfigurationError, err:
648
    print "Cluster configuration incomplete: '%s'" % str(err)
649
    sys.exit(5)
650

    
651
  # create the various SUB_RUN_DIRS, if not existing, so that we handle the
652
  # situation where RUN_DIR is tmpfs
653
  for dir_name in constants.SUB_RUN_DIRS:
654
    if not os.path.exists(dir_name):
655
      try:
656
        os.mkdir(dir_name, 0755)
657
      except EnvironmentError, err:
658
        if err.errno != errno.EEXIST:
659
          print ("Node setup wrong, cannot create directory %s: %s" %
660
                 (dir_name, err))
661
          sys.exit(5)
662
    if not os.path.isdir(dir_name):
663
      print ("Node setup wrong, %s is not a directory" % dir_name)
664
      sys.exit(5)
665

    
666
  # become a daemon
667
  if options.fork:
668
    utils.Daemonize(logfile=constants.LOG_NODESERVER)
669

    
670
  utils.WritePidFile(constants.NODED_PID)
671

    
672
  logger.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
673
                      stderr_logging=not options.fork)
674
  logging.info("ganeti node daemon startup")
675

    
676
  # Prepare job queue
677
  queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
678

    
679
  if options.fork:
680
    server = ForkingHTTPServer(('', port))
681
  else:
682
    server = NodeDaemonHttpServer(('', port))
683

    
684
  try:
685
    server.serve_forever()
686
  finally:
687
    utils.RemovePidFile(constants.NODED_PID)
688

    
689

    
690
if __name__ == '__main__':
691
  main()