Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ cc28af80

History | View | Annotate | Download (17.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import SocketServer
31
import errno
32
import logging
33
import signal
34

    
35
from optparse import OptionParser
36

    
37
from ganeti import backend
38
from ganeti import logger
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import errors
42
from ganeti import jstore
43
from ganeti import daemon
44
from ganeti import http
45
from ganeti import utils
46

    
47

    
48
queue_lock = None
49

    
50

    
51
def _RequireJobQueueLock(fn):
52
  """Decorator for job queue manipulating functions.
53

    
54
  """
55
  QUEUE_LOCK_TIMEOUT = 10
56

    
57
  def wrapper(*args, **kwargs):
58
    # Locking in exclusive, blocking mode because there could be several
59
    # children running at the same time. Waiting up to 10 seconds.
60
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
61
    try:
62
      return fn(*args, **kwargs)
63
    finally:
64
      queue_lock.Unlock()
65

    
66
  return wrapper
67

    
68

    
69
class NodeHttpServer(http.HttpServer):
70
  """The server implementation.
71

    
72
  This class holds all methods exposed over the RPC interface.
73

    
74
  """
75
  def __init__(self, *args, **kwargs):
76
    http.HttpServer.__init__(self, *args, **kwargs)
77
    self.noded_pid = os.getpid()
78

    
79
  def HandleRequest(self, req):
80
    """Handle a request.
81

    
82
    """
83
    if req.request_method.upper() != "PUT":
84
      raise http.HTTPBadRequest()
85

    
86
    path = req.request_path
87
    if path.startswith("/"):
88
      path = path[1:]
89

    
90
    method = getattr(self, "perspective_%s" % path, None)
91
    if method is None:
92
      raise http.HTTPNotFound()
93

    
94
    try:
95
      try:
96
        return method(req.request_post_data)
97
      except:
98
        logging.exception("Error in RPC call")
99
        raise
100
    except errors.QuitGanetiException, err:
101
      # Tell parent to quit
102
      os.kill(self.noded_pid, signal.SIGTERM)
103

    
104
  # the new block devices  --------------------------
105

    
106
  @staticmethod
107
  def perspective_blockdev_create(params):
108
    """Create a block device.
109

    
110
    """
111
    bdev_s, size, owner, on_primary, info = params
112
    bdev = objects.Disk.FromDict(bdev_s)
113
    if bdev is None:
114
      raise ValueError("can't unserialize data!")
115
    return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
116

    
117
  @staticmethod
118
  def perspective_blockdev_remove(params):
119
    """Remove a block device.
120

    
121
    """
122
    bdev_s = params[0]
123
    bdev = objects.Disk.FromDict(bdev_s)
124
    return backend.RemoveBlockDevice(bdev)
125

    
126
  @staticmethod
127
  def perspective_blockdev_rename(params):
128
    """Remove a block device.
129

    
130
    """
131
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
132
    return backend.RenameBlockDevices(devlist)
133

    
134
  @staticmethod
135
  def perspective_blockdev_assemble(params):
136
    """Assemble a block device.
137

    
138
    """
139
    bdev_s, owner, on_primary = params
140
    bdev = objects.Disk.FromDict(bdev_s)
141
    if bdev is None:
142
      raise ValueError("can't unserialize data!")
143
    return backend.AssembleBlockDevice(bdev, owner, on_primary)
144

    
145
  @staticmethod
146
  def perspective_blockdev_shutdown(params):
147
    """Shutdown a block device.
148

    
149
    """
150
    bdev_s = params[0]
151
    bdev = objects.Disk.FromDict(bdev_s)
152
    if bdev is None:
153
      raise ValueError("can't unserialize data!")
154
    return backend.ShutdownBlockDevice(bdev)
155

    
156
  @staticmethod
157
  def perspective_blockdev_addchildren(params):
158
    """Add a child to a mirror device.
159

    
160
    Note: this is only valid for mirror devices. It's the caller's duty
161
    to send a correct disk, otherwise we raise an error.
162

    
163
    """
164
    bdev_s, ndev_s = params
165
    bdev = objects.Disk.FromDict(bdev_s)
166
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
167
    if bdev is None or ndevs.count(None) > 0:
168
      raise ValueError("can't unserialize data!")
169
    return backend.MirrorAddChildren(bdev, ndevs)
170

    
171
  @staticmethod
172
  def perspective_blockdev_removechildren(params):
173
    """Remove a child from a mirror device.
174

    
175
    This is only valid for mirror devices, of course. It's the callers
176
    duty to send a correct disk, otherwise we raise an error.
177

    
178
    """
179
    bdev_s, ndev_s = params
180
    bdev = objects.Disk.FromDict(bdev_s)
181
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
182
    if bdev is None or ndevs.count(None) > 0:
183
      raise ValueError("can't unserialize data!")
184
    return backend.MirrorRemoveChildren(bdev, ndevs)
185

    
186
  @staticmethod
187
  def perspective_blockdev_getmirrorstatus(params):
188
    """Return the mirror status for a list of disks.
189

    
190
    """
191
    disks = [objects.Disk.FromDict(dsk_s)
192
            for dsk_s in params]
193
    return backend.GetMirrorStatus(disks)
194

    
195
  @staticmethod
196
  def perspective_blockdev_find(params):
197
    """Expose the FindBlockDevice functionality for a disk.
198

    
199
    This will try to find but not activate a disk.
200

    
201
    """
202
    disk = objects.Disk.FromDict(params[0])
203
    return backend.FindBlockDevice(disk)
204

    
205
  @staticmethod
206
  def perspective_blockdev_snapshot(params):
207
    """Create a snapshot device.
208

    
209
    Note that this is only valid for LVM disks, if we get passed
210
    something else we raise an exception. The snapshot device can be
211
    remove by calling the generic block device remove call.
212

    
213
    """
214
    cfbd = objects.Disk.FromDict(params[0])
215
    return backend.SnapshotBlockDevice(cfbd)
216

    
217
  @staticmethod
218
  def perspective_blockdev_grow(params):
219
    """Grow a stack of devices.
220

    
221
    """
222
    cfbd = objects.Disk.FromDict(params[0])
223
    amount = params[1]
224
    return backend.GrowBlockDevice(cfbd, amount)
225

    
226
  @staticmethod
227
  def perspective_blockdev_close(params):
228
    """Closes the given block devices.
229

    
230
    """
231
    disks = [objects.Disk.FromDict(cf) for cf in params]
232
    return backend.CloseBlockDevices(disks)
233

    
234
  # export/import  --------------------------
235

    
236
  @staticmethod
237
  def perspective_snapshot_export(params):
238
    """Export a given snapshot.
239

    
240
    """
241
    disk = objects.Disk.FromDict(params[0])
242
    dest_node = params[1]
243
    instance = objects.Instance.FromDict(params[2])
244
    cluster_name = params[3]
245
    return backend.ExportSnapshot(disk, dest_node, instance, cluster_name)
246

    
247
  @staticmethod
248
  def perspective_finalize_export(params):
249
    """Expose the finalize export functionality.
250

    
251
    """
252
    instance = objects.Instance.FromDict(params[0])
253
    snap_disks = [objects.Disk.FromDict(str_data)
254
                  for str_data in params[1]]
255
    return backend.FinalizeExport(instance, snap_disks)
256

    
257
  @staticmethod
258
  def perspective_export_info(params):
259
    """Query information about an existing export on this node.
260

    
261
    The given path may not contain an export, in which case we return
262
    None.
263

    
264
    """
265
    path = params[0]
266
    einfo = backend.ExportInfo(path)
267
    if einfo is None:
268
      return einfo
269
    return einfo.Dumps()
270

    
271
  @staticmethod
272
  def perspective_export_list(params):
273
    """List the available exports on this node.
274

    
275
    Note that as opposed to export_info, which may query data about an
276
    export in any path, this only queries the standard Ganeti path
277
    (constants.EXPORT_DIR).
278

    
279
    """
280
    return backend.ListExports()
281

    
282
  @staticmethod
283
  def perspective_export_remove(params):
284
    """Remove an export.
285

    
286
    """
287
    export = params[0]
288
    return backend.RemoveExport(export)
289

    
290
  # volume  --------------------------
291

    
292
  @staticmethod
293
  def perspective_volume_list(params):
294
    """Query the list of logical volumes in a given volume group.
295

    
296
    """
297
    vgname = params[0]
298
    return backend.GetVolumeList(vgname)
299

    
300
  @staticmethod
301
  def perspective_vg_list(params):
302
    """Query the list of volume groups.
303

    
304
    """
305
    return backend.ListVolumeGroups()
306

    
307
  # bridge  --------------------------
308

    
309
  @staticmethod
310
  def perspective_bridges_exist(params):
311
    """Check if all bridges given exist on this node.
312

    
313
    """
314
    bridges_list = params[0]
315
    return backend.BridgesExist(bridges_list)
316

    
317
  # instance  --------------------------
318

    
319
  @staticmethod
320
  def perspective_instance_os_add(params):
321
    """Install an OS on a given instance.
322

    
323
    """
324
    inst_s, os_disk, swap_disk = params
325
    inst = objects.Instance.FromDict(inst_s)
326
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
327

    
328
  @staticmethod
329
  def perspective_instance_run_rename(params):
330
    """Runs the OS rename script for an instance.
331

    
332
    """
333
    inst_s, old_name, os_disk, swap_disk = params
334
    inst = objects.Instance.FromDict(inst_s)
335
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
336

    
337
  @staticmethod
338
  def perspective_instance_os_import(params):
339
    """Run the import function of an OS onto a given instance.
340

    
341
    """
342
    inst_s, os_disk, swap_disk, src_node, src_image, cluster_name = params
343
    inst = objects.Instance.FromDict(inst_s)
344
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
345
                                        src_node, src_image, cluster_name)
346

    
347
  @staticmethod
348
  def perspective_instance_shutdown(params):
349
    """Shutdown an instance.
350

    
351
    """
352
    instance = objects.Instance.FromDict(params[0])
353
    return backend.ShutdownInstance(instance)
354

    
355
  @staticmethod
356
  def perspective_instance_start(params):
357
    """Start an instance.
358

    
359
    """
360
    instance = objects.Instance.FromDict(params[0])
361
    extra_args = params[1]
362
    return backend.StartInstance(instance, extra_args)
363

    
364
  @staticmethod
365
  def perspective_instance_migrate(params):
366
    """Migrates an instance.
367

    
368
    """
369
    instance, target, live = params
370
    instance = objects.Instance.FromDict(instance)
371
    return backend.MigrateInstance(instance, target, live)
372

    
373
  @staticmethod
374
  def perspective_instance_reboot(params):
375
    """Reboot an instance.
376

    
377
    """
378
    instance = objects.Instance.FromDict(params[0])
379
    reboot_type = params[1]
380
    extra_args = params[2]
381
    return backend.RebootInstance(instance, reboot_type, extra_args)
382

    
383
  @staticmethod
384
  def perspective_instance_info(params):
385
    """Query instance information.
386

    
387
    """
388
    return backend.GetInstanceInfo(params[0])
389

    
390
  @staticmethod
391
  def perspective_all_instances_info(params):
392
    """Query information about all instances.
393

    
394
    """
395
    return backend.GetAllInstancesInfo(params[0])
396

    
397
  @staticmethod
398
  def perspective_instance_list(params):
399
    """Query the list of running instances.
400

    
401
    """
402
    return backend.GetInstanceList(params[0])
403

    
404
  # node --------------------------
405

    
406
  @staticmethod
407
  def perspective_node_tcp_ping(params):
408
    """Do a TcpPing on the remote node.
409

    
410
    """
411
    return utils.TcpPing(params[1], params[2], timeout=params[3],
412
                         live_port_needed=params[4], source=params[0])
413

    
414
  @staticmethod
415
  def perspective_node_info(params):
416
    """Query node information.
417

    
418
    """
419
    vgname, hypervisor_type = params
420
    return backend.GetNodeInfo(vgname, hypervisor_type)
421

    
422
  @staticmethod
423
  def perspective_node_add(params):
424
    """Complete the registration of this node in the cluster.
425

    
426
    """
427
    return backend.AddNode(params[0], params[1], params[2],
428
                           params[3], params[4], params[5])
429

    
430
  @staticmethod
431
  def perspective_node_verify(params):
432
    """Run a verify sequence on this node.
433

    
434
    """
435
    return backend.VerifyNode(params[0], params[1])
436

    
437
  @staticmethod
438
  def perspective_node_start_master(params):
439
    """Promote this node to master status.
440

    
441
    """
442
    return backend.StartMaster(params[0])
443

    
444
  @staticmethod
445
  def perspective_node_stop_master(params):
446
    """Demote this node from master status.
447

    
448
    """
449
    return backend.StopMaster(params[0])
450

    
451
  @staticmethod
452
  def perspective_node_leave_cluster(params):
453
    """Cleanup after leaving a cluster.
454

    
455
    """
456
    return backend.LeaveCluster()
457

    
458
  @staticmethod
459
  def perspective_node_volumes(params):
460
    """Query the list of all logical volume groups.
461

    
462
    """
463
    return backend.NodeVolumes()
464

    
465
  # cluster --------------------------
466

    
467
  @staticmethod
468
  def perspective_version(params):
469
    """Query version information.
470

    
471
    """
472
    return constants.PROTOCOL_VERSION
473

    
474
  @staticmethod
475
  def perspective_upload_file(params):
476
    """Upload a file.
477

    
478
    Note that the backend implementation imposes strict rules on which
479
    files are accepted.
480

    
481
    """
482
    return backend.UploadFile(*params)
483

    
484
  @staticmethod
485
  def perspective_master_info(params):
486
    """Query master information.
487

    
488
    """
489
    return backend.GetMasterInfo()
490

    
491
  # os -----------------------
492

    
493
  @staticmethod
494
  def perspective_os_diagnose(params):
495
    """Query detailed information about existing OSes.
496

    
497
    """
498
    return [os.ToDict() for os in backend.DiagnoseOS()]
499

    
500
  @staticmethod
501
  def perspective_os_get(params):
502
    """Query information about a given OS.
503

    
504
    """
505
    name = params[0]
506
    try:
507
      os_obj = backend.OSFromDisk(name)
508
    except errors.InvalidOS, err:
509
      os_obj = objects.OS.FromInvalidOS(err)
510
    return os_obj.ToDict()
511

    
512
  # hooks -----------------------
513

    
514
  @staticmethod
515
  def perspective_hooks_runner(params):
516
    """Run hook scripts.
517

    
518
    """
519
    hpath, phase, env = params
520
    hr = backend.HooksRunner()
521
    return hr.RunHooks(hpath, phase, env)
522

    
523
  # iallocator -----------------
524

    
525
  @staticmethod
526
  def perspective_iallocator_runner(params):
527
    """Run an iallocator script.
528

    
529
    """
530
    name, idata = params
531
    iar = backend.IAllocatorRunner()
532
    return iar.Run(name, idata)
533

    
534
  # test -----------------------
535

    
536
  @staticmethod
537
  def perspective_test_delay(params):
538
    """Run test delay.
539

    
540
    """
541
    duration = params[0]
542
    return utils.TestDelay(duration)
543

    
544
  # file storage ---------------
545

    
546
  @staticmethod
547
  def perspective_file_storage_dir_create(params):
548
    """Create the file storage directory.
549

    
550
    """
551
    file_storage_dir = params[0]
552
    return backend.CreateFileStorageDir(file_storage_dir)
553

    
554
  @staticmethod
555
  def perspective_file_storage_dir_remove(params):
556
    """Remove the file storage directory.
557

    
558
    """
559
    file_storage_dir = params[0]
560
    return backend.RemoveFileStorageDir(file_storage_dir)
561

    
562
  @staticmethod
563
  def perspective_file_storage_dir_rename(params):
564
    """Rename the file storage directory.
565

    
566
    """
567
    old_file_storage_dir = params[0]
568
    new_file_storage_dir = params[1]
569
    return backend.RenameFileStorageDir(old_file_storage_dir,
570
                                        new_file_storage_dir)
571

    
572
  # jobs ------------------------
573

    
574
  @staticmethod
575
  @_RequireJobQueueLock
576
  def perspective_jobqueue_update(params):
577
    """Update job queue.
578

    
579
    """
580
    (file_name, content) = params
581
    return backend.JobQueueUpdate(file_name, content)
582

    
583
  @staticmethod
584
  @_RequireJobQueueLock
585
  def perspective_jobqueue_purge(params):
586
    """Purge job queue.
587

    
588
    """
589
    return backend.JobQueuePurge()
590

    
591
  @staticmethod
592
  @_RequireJobQueueLock
593
  def perspective_jobqueue_rename(params):
594
    """Rename a job queue file.
595

    
596
    """
597
    (old, new) = params
598

    
599
    return backend.JobQueueRename(old, new)
600

    
601

    
602
def ParseOptions():
603
  """Parse the command line options.
604

    
605
  Returns:
606
    (options, args) as from OptionParser.parse_args()
607

    
608
  """
609
  parser = OptionParser(description="Ganeti node daemon",
610
                        usage="%prog [-f] [-d]",
611
                        version="%%prog (ganeti) %s" %
612
                        constants.RELEASE_VERSION)
613

    
614
  parser.add_option("-f", "--foreground", dest="fork",
615
                    help="Don't detach from the current terminal",
616
                    default=True, action="store_false")
617
  parser.add_option("-d", "--debug", dest="debug",
618
                    help="Enable some debug messages",
619
                    default=False, action="store_true")
620
  options, args = parser.parse_args()
621
  return options, args
622

    
623

    
624
def main():
625
  """Main function for the node daemon.
626

    
627
  """
628
  global queue_lock
629

    
630
  options, args = ParseOptions()
631
  utils.debug = options.debug
632
  for fname in (constants.SSL_CERT_FILE,):
633
    if not os.path.isfile(fname):
634
      print "config %s not there, will not run." % fname
635
      sys.exit(5)
636

    
637
  try:
638
    port = utils.GetNodeDaemonPort()
639
    pwdata = utils.GetNodeDaemonPassword()
640
  except errors.ConfigurationError, err:
641
    print "Cluster configuration incomplete: '%s'" % str(err)
642
    sys.exit(5)
643

    
644
  # create the various SUB_RUN_DIRS, if not existing, so that we handle the
645
  # situation where RUN_DIR is tmpfs
646
  for dir_name in constants.SUB_RUN_DIRS:
647
    if not os.path.exists(dir_name):
648
      try:
649
        os.mkdir(dir_name, 0755)
650
      except EnvironmentError, err:
651
        if err.errno != errno.EEXIST:
652
          print ("Node setup wrong, cannot create directory %s: %s" %
653
                 (dir_name, err))
654
          sys.exit(5)
655
    if not os.path.isdir(dir_name):
656
      print ("Node setup wrong, %s is not a directory" % dir_name)
657
      sys.exit(5)
658

    
659
  # become a daemon
660
  if options.fork:
661
    utils.Daemonize(logfile=constants.LOG_NODESERVER)
662

    
663
  utils.WritePidFile(constants.NODED_PID)
664
  try:
665
    logger.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
666
                        stderr_logging=not options.fork)
667
    logging.info("ganeti node daemon startup")
668

    
669
    # Prepare job queue
670
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
671

    
672
    mainloop = daemon.Mainloop()
673
    server = NodeHttpServer(mainloop, ("", port))
674
    server.Start()
675
    try:
676
      mainloop.Run()
677
    finally:
678
      server.Stop()
679
  finally:
680
    utils.RemovePidFile(constants.NODED_PID)
681

    
682

    
683
if __name__ == '__main__':
684
  main()