Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 03d1dba2

History | View | Annotate | Download (18.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import SocketServer
31
import errno
32
import logging
33
import signal
34

    
35
from optparse import OptionParser
36

    
37
from ganeti import backend
38
from ganeti import constants
39
from ganeti import objects
40
from ganeti import errors
41
from ganeti import jstore
42
from ganeti import daemon
43
from ganeti import http
44
from ganeti import utils
45

    
46

    
47
queue_lock = None
48

    
49

    
50
def _RequireJobQueueLock(fn):
51
  """Decorator for job queue manipulating functions.
52

    
53
  """
54
  QUEUE_LOCK_TIMEOUT = 10
55

    
56
  def wrapper(*args, **kwargs):
57
    # Locking in exclusive, blocking mode because there could be several
58
    # children running at the same time. Waiting up to 10 seconds.
59
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
60
    try:
61
      return fn(*args, **kwargs)
62
    finally:
63
      queue_lock.Unlock()
64

    
65
  return wrapper
66

    
67

    
68
class NodeHttpServer(http.HttpServer):
69
  """The server implementation.
70

    
71
  This class holds all methods exposed over the RPC interface.
72

    
73
  """
74
  def __init__(self, *args, **kwargs):
75
    http.HttpServer.__init__(self, *args, **kwargs)
76
    self.noded_pid = os.getpid()
77

    
78
  def HandleRequest(self, req):
79
    """Handle a request.
80

    
81
    """
82
    if req.request_method.upper() != "PUT":
83
      raise http.HTTPBadRequest()
84

    
85
    path = req.request_path
86
    if path.startswith("/"):
87
      path = path[1:]
88

    
89
    method = getattr(self, "perspective_%s" % path, None)
90
    if method is None:
91
      raise http.HTTPNotFound()
92

    
93
    try:
94
      try:
95
        return method(req.request_post_data)
96
      except:
97
        logging.exception("Error in RPC call")
98
        raise
99
    except errors.QuitGanetiException, err:
100
      # Tell parent to quit
101
      os.kill(self.noded_pid, signal.SIGTERM)
102

    
103
  # the new block devices  --------------------------
104

    
105
  @staticmethod
106
  def perspective_blockdev_create(params):
107
    """Create a block device.
108

    
109
    """
110
    bdev_s, size, owner, on_primary, info = params
111
    bdev = objects.Disk.FromDict(bdev_s)
112
    if bdev is None:
113
      raise ValueError("can't unserialize data!")
114
    return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
115

    
116
  @staticmethod
117
  def perspective_blockdev_remove(params):
118
    """Remove a block device.
119

    
120
    """
121
    bdev_s = params[0]
122
    bdev = objects.Disk.FromDict(bdev_s)
123
    return backend.RemoveBlockDevice(bdev)
124

    
125
  @staticmethod
126
  def perspective_blockdev_rename(params):
127
    """Remove a block device.
128

    
129
    """
130
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
131
    return backend.RenameBlockDevices(devlist)
132

    
133
  @staticmethod
134
  def perspective_blockdev_assemble(params):
135
    """Assemble a block device.
136

    
137
    """
138
    bdev_s, owner, on_primary = params
139
    bdev = objects.Disk.FromDict(bdev_s)
140
    if bdev is None:
141
      raise ValueError("can't unserialize data!")
142
    return backend.AssembleBlockDevice(bdev, owner, on_primary)
143

    
144
  @staticmethod
145
  def perspective_blockdev_shutdown(params):
146
    """Shutdown a block device.
147

    
148
    """
149
    bdev_s = params[0]
150
    bdev = objects.Disk.FromDict(bdev_s)
151
    if bdev is None:
152
      raise ValueError("can't unserialize data!")
153
    return backend.ShutdownBlockDevice(bdev)
154

    
155
  @staticmethod
156
  def perspective_blockdev_addchildren(params):
157
    """Add a child to a mirror device.
158

    
159
    Note: this is only valid for mirror devices. It's the caller's duty
160
    to send a correct disk, otherwise we raise an error.
161

    
162
    """
163
    bdev_s, ndev_s = params
164
    bdev = objects.Disk.FromDict(bdev_s)
165
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
166
    if bdev is None or ndevs.count(None) > 0:
167
      raise ValueError("can't unserialize data!")
168
    return backend.MirrorAddChildren(bdev, ndevs)
169

    
170
  @staticmethod
171
  def perspective_blockdev_removechildren(params):
172
    """Remove a child from a mirror device.
173

    
174
    This is only valid for mirror devices, of course. It's the callers
175
    duty to send a correct disk, otherwise we raise an error.
176

    
177
    """
178
    bdev_s, ndev_s = params
179
    bdev = objects.Disk.FromDict(bdev_s)
180
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
181
    if bdev is None or ndevs.count(None) > 0:
182
      raise ValueError("can't unserialize data!")
183
    return backend.MirrorRemoveChildren(bdev, ndevs)
184

    
185
  @staticmethod
186
  def perspective_blockdev_getmirrorstatus(params):
187
    """Return the mirror status for a list of disks.
188

    
189
    """
190
    disks = [objects.Disk.FromDict(dsk_s)
191
            for dsk_s in params]
192
    return backend.GetMirrorStatus(disks)
193

    
194
  @staticmethod
195
  def perspective_blockdev_find(params):
196
    """Expose the FindBlockDevice functionality for a disk.
197

    
198
    This will try to find but not activate a disk.
199

    
200
    """
201
    disk = objects.Disk.FromDict(params[0])
202
    return backend.FindBlockDevice(disk)
203

    
204
  @staticmethod
205
  def perspective_blockdev_snapshot(params):
206
    """Create a snapshot device.
207

    
208
    Note that this is only valid for LVM disks, if we get passed
209
    something else we raise an exception. The snapshot device can be
210
    remove by calling the generic block device remove call.
211

    
212
    """
213
    cfbd = objects.Disk.FromDict(params[0])
214
    return backend.SnapshotBlockDevice(cfbd)
215

    
216
  @staticmethod
217
  def perspective_blockdev_grow(params):
218
    """Grow a stack of devices.
219

    
220
    """
221
    cfbd = objects.Disk.FromDict(params[0])
222
    amount = params[1]
223
    return backend.GrowBlockDevice(cfbd, amount)
224

    
225
  @staticmethod
226
  def perspective_blockdev_close(params):
227
    """Closes the given block devices.
228

    
229
    """
230
    disks = [objects.Disk.FromDict(cf) for cf in params]
231
    return backend.CloseBlockDevices(disks)
232

    
233
  # export/import  --------------------------
234

    
235
  @staticmethod
236
  def perspective_snapshot_export(params):
237
    """Export a given snapshot.
238

    
239
    """
240
    disk = objects.Disk.FromDict(params[0])
241
    dest_node = params[1]
242
    instance = objects.Instance.FromDict(params[2])
243
    cluster_name = params[3]
244
    dev_idx = params[4]
245
    return backend.ExportSnapshot(disk, dest_node, instance,
246
                                  cluster_name, dev_idx)
247

    
248
  @staticmethod
249
  def perspective_finalize_export(params):
250
    """Expose the finalize export functionality.
251

    
252
    """
253
    instance = objects.Instance.FromDict(params[0])
254
    snap_disks = [objects.Disk.FromDict(str_data)
255
                  for str_data in params[1]]
256
    return backend.FinalizeExport(instance, snap_disks)
257

    
258
  @staticmethod
259
  def perspective_export_info(params):
260
    """Query information about an existing export on this node.
261

    
262
    The given path may not contain an export, in which case we return
263
    None.
264

    
265
    """
266
    path = params[0]
267
    einfo = backend.ExportInfo(path)
268
    if einfo is None:
269
      return einfo
270
    return einfo.Dumps()
271

    
272
  @staticmethod
273
  def perspective_export_list(params):
274
    """List the available exports on this node.
275

    
276
    Note that as opposed to export_info, which may query data about an
277
    export in any path, this only queries the standard Ganeti path
278
    (constants.EXPORT_DIR).
279

    
280
    """
281
    return backend.ListExports()
282

    
283
  @staticmethod
284
  def perspective_export_remove(params):
285
    """Remove an export.
286

    
287
    """
288
    export = params[0]
289
    return backend.RemoveExport(export)
290

    
291
  # volume  --------------------------
292

    
293
  @staticmethod
294
  def perspective_volume_list(params):
295
    """Query the list of logical volumes in a given volume group.
296

    
297
    """
298
    vgname = params[0]
299
    return backend.GetVolumeList(vgname)
300

    
301
  @staticmethod
302
  def perspective_vg_list(params):
303
    """Query the list of volume groups.
304

    
305
    """
306
    return backend.ListVolumeGroups()
307

    
308
  # bridge  --------------------------
309

    
310
  @staticmethod
311
  def perspective_bridges_exist(params):
312
    """Check if all bridges given exist on this node.
313

    
314
    """
315
    bridges_list = params[0]
316
    return backend.BridgesExist(bridges_list)
317

    
318
  # instance  --------------------------
319

    
320
  @staticmethod
321
  def perspective_instance_os_add(params):
322
    """Install an OS on a given instance.
323

    
324
    """
325
    inst_s = params[0]
326
    inst = objects.Instance.FromDict(inst_s)
327
    return backend.AddOSToInstance(inst)
328

    
329
  @staticmethod
330
  def perspective_instance_run_rename(params):
331
    """Runs the OS rename script for an instance.
332

    
333
    """
334
    inst_s, old_name = params
335
    inst = objects.Instance.FromDict(inst_s)
336
    return backend.RunRenameInstance(inst, old_name)
337

    
338
  @staticmethod
339
  def perspective_instance_os_import(params):
340
    """Run the import function of an OS onto a given instance.
341

    
342
    """
343
    inst_s, src_node, src_images, cluster_name = params
344
    inst = objects.Instance.FromDict(inst_s)
345
    return backend.ImportOSIntoInstance(inst, src_node, src_images,
346
                                        cluster_name)
347

    
348
  @staticmethod
349
  def perspective_instance_shutdown(params):
350
    """Shutdown an instance.
351

    
352
    """
353
    instance = objects.Instance.FromDict(params[0])
354
    return backend.ShutdownInstance(instance)
355

    
356
  @staticmethod
357
  def perspective_instance_start(params):
358
    """Start an instance.
359

    
360
    """
361
    instance = objects.Instance.FromDict(params[0])
362
    extra_args = params[1]
363
    return backend.StartInstance(instance, extra_args)
364

    
365
  @staticmethod
366
  def perspective_instance_migrate(params):
367
    """Migrates an instance.
368

    
369
    """
370
    instance, target, live = params
371
    instance = objects.Instance.FromDict(instance)
372
    return backend.MigrateInstance(instance, target, live)
373

    
374
  @staticmethod
375
  def perspective_instance_reboot(params):
376
    """Reboot an instance.
377

    
378
    """
379
    instance = objects.Instance.FromDict(params[0])
380
    reboot_type = params[1]
381
    extra_args = params[2]
382
    return backend.RebootInstance(instance, reboot_type, extra_args)
383

    
384
  @staticmethod
385
  def perspective_instance_info(params):
386
    """Query instance information.
387

    
388
    """
389
    return backend.GetInstanceInfo(params[0], params[1])
390

    
391
  @staticmethod
392
  def perspective_all_instances_info(params):
393
    """Query information about all instances.
394

    
395
    """
396
    return backend.GetAllInstancesInfo(params[0])
397

    
398
  @staticmethod
399
  def perspective_instance_list(params):
400
    """Query the list of running instances.
401

    
402
    """
403
    return backend.GetInstanceList(params[0])
404

    
405
  # node --------------------------
406

    
407
  @staticmethod
408
  def perspective_node_tcp_ping(params):
409
    """Do a TcpPing on the remote node.
410

    
411
    """
412
    return utils.TcpPing(params[1], params[2], timeout=params[3],
413
                         live_port_needed=params[4], source=params[0])
414

    
415
  @staticmethod
416
  def perspective_node_has_ip_address(params):
417
    """Checks if a node has the given ip address.
418

    
419
    """
420
    return utils.OwnIpAddress(params[0])
421

    
422
  @staticmethod
423
  def perspective_node_info(params):
424
    """Query node information.
425

    
426
    """
427
    vgname, hypervisor_type = params
428
    return backend.GetNodeInfo(vgname, hypervisor_type)
429

    
430
  @staticmethod
431
  def perspective_node_add(params):
432
    """Complete the registration of this node in the cluster.
433

    
434
    """
435
    return backend.AddNode(params[0], params[1], params[2],
436
                           params[3], params[4], params[5])
437

    
438
  @staticmethod
439
  def perspective_node_verify(params):
440
    """Run a verify sequence on this node.
441

    
442
    """
443
    return backend.VerifyNode(params[0], params[1])
444

    
445
  @staticmethod
446
  def perspective_node_start_master(params):
447
    """Promote this node to master status.
448

    
449
    """
450
    return backend.StartMaster(params[0])
451

    
452
  @staticmethod
453
  def perspective_node_stop_master(params):
454
    """Demote this node from master status.
455

    
456
    """
457
    return backend.StopMaster(params[0])
458

    
459
  @staticmethod
460
  def perspective_node_leave_cluster(params):
461
    """Cleanup after leaving a cluster.
462

    
463
    """
464
    return backend.LeaveCluster()
465

    
466
  @staticmethod
467
  def perspective_node_volumes(params):
468
    """Query the list of all logical volume groups.
469

    
470
    """
471
    return backend.NodeVolumes()
472

    
473
  # cluster --------------------------
474

    
475
  @staticmethod
476
  def perspective_version(params):
477
    """Query version information.
478

    
479
    """
480
    return constants.PROTOCOL_VERSION
481

    
482
  @staticmethod
483
  def perspective_upload_file(params):
484
    """Upload a file.
485

    
486
    Note that the backend implementation imposes strict rules on which
487
    files are accepted.
488

    
489
    """
490
    return backend.UploadFile(*params)
491

    
492
  @staticmethod
493
  def perspective_master_info(params):
494
    """Query master information.
495

    
496
    """
497
    return backend.GetMasterInfo()
498

    
499
  @staticmethod
500
  def perspective_write_ssconf_files(params):
501
    """Write ssconf files.
502

    
503
    """
504
    (values,) = params
505
    return backend.WriteSsconfFiles(values)
506

    
507
  # os -----------------------
508

    
509
  @staticmethod
510
  def perspective_os_diagnose(params):
511
    """Query detailed information about existing OSes.
512

    
513
    """
514
    return [os.ToDict() for os in backend.DiagnoseOS()]
515

    
516
  @staticmethod
517
  def perspective_os_get(params):
518
    """Query information about a given OS.
519

    
520
    """
521
    name = params[0]
522
    try:
523
      os_obj = backend.OSFromDisk(name)
524
    except errors.InvalidOS, err:
525
      os_obj = objects.OS.FromInvalidOS(err)
526
    return os_obj.ToDict()
527

    
528
  # hooks -----------------------
529

    
530
  @staticmethod
531
  def perspective_hooks_runner(params):
532
    """Run hook scripts.
533

    
534
    """
535
    hpath, phase, env = params
536
    hr = backend.HooksRunner()
537
    return hr.RunHooks(hpath, phase, env)
538

    
539
  # iallocator -----------------
540

    
541
  @staticmethod
542
  def perspective_iallocator_runner(params):
543
    """Run an iallocator script.
544

    
545
    """
546
    name, idata = params
547
    iar = backend.IAllocatorRunner()
548
    return iar.Run(name, idata)
549

    
550
  # test -----------------------
551

    
552
  @staticmethod
553
  def perspective_test_delay(params):
554
    """Run test delay.
555

    
556
    """
557
    duration = params[0]
558
    return utils.TestDelay(duration)
559

    
560
  # file storage ---------------
561

    
562
  @staticmethod
563
  def perspective_file_storage_dir_create(params):
564
    """Create the file storage directory.
565

    
566
    """
567
    file_storage_dir = params[0]
568
    return backend.CreateFileStorageDir(file_storage_dir)
569

    
570
  @staticmethod
571
  def perspective_file_storage_dir_remove(params):
572
    """Remove the file storage directory.
573

    
574
    """
575
    file_storage_dir = params[0]
576
    return backend.RemoveFileStorageDir(file_storage_dir)
577

    
578
  @staticmethod
579
  def perspective_file_storage_dir_rename(params):
580
    """Rename the file storage directory.
581

    
582
    """
583
    old_file_storage_dir = params[0]
584
    new_file_storage_dir = params[1]
585
    return backend.RenameFileStorageDir(old_file_storage_dir,
586
                                        new_file_storage_dir)
587

    
588
  # jobs ------------------------
589

    
590
  @staticmethod
591
  @_RequireJobQueueLock
592
  def perspective_jobqueue_update(params):
593
    """Update job queue.
594

    
595
    """
596
    (file_name, content) = params
597
    return backend.JobQueueUpdate(file_name, content)
598

    
599
  @staticmethod
600
  @_RequireJobQueueLock
601
  def perspective_jobqueue_purge(params):
602
    """Purge job queue.
603

    
604
    """
605
    return backend.JobQueuePurge()
606

    
607
  @staticmethod
608
  @_RequireJobQueueLock
609
  def perspective_jobqueue_rename(params):
610
    """Rename a job queue file.
611

    
612
    """
613
    (old, new) = params
614

    
615
    return backend.JobQueueRename(old, new)
616

    
617
  @staticmethod
618
  def perspective_jobqueue_set_drain(params):
619
    """Set/unset the queue drain flag.
620

    
621
    """
622
    drain_flag = params[0]
623
    return backend.JobQueueSetDrainFlag(drain_flag)
624

    
625

    
626
  # hypervisor ---------------
627

    
628
  @staticmethod
629
  def perspective_hypervisor_validate_params(params):
630
    """Validate the hypervisor parameters.
631

    
632
    """
633
    (hvname, hvparams) = params
634
    return backend.ValidateHVParams(hvname, hvparams)
635

    
636

    
637
def ParseOptions():
638
  """Parse the command line options.
639

    
640
  Returns:
641
    (options, args) as from OptionParser.parse_args()
642

    
643
  """
644
  parser = OptionParser(description="Ganeti node daemon",
645
                        usage="%prog [-f] [-d]",
646
                        version="%%prog (ganeti) %s" %
647
                        constants.RELEASE_VERSION)
648

    
649
  parser.add_option("-f", "--foreground", dest="fork",
650
                    help="Don't detach from the current terminal",
651
                    default=True, action="store_false")
652
  parser.add_option("-d", "--debug", dest="debug",
653
                    help="Enable some debug messages",
654
                    default=False, action="store_true")
655
  options, args = parser.parse_args()
656
  return options, args
657

    
658

    
659
def EnsureRuntimeEnvironment():
660
  """Ensure our run-time environment is complete.
661

    
662
  Currently this creates directories which could be missing, either
663
  due to directories being on a tmpfs mount, or due to incomplete
664
  packaging.
665

    
666
  """
667
  dirs = [(val, 0755) for val in constants.SUB_RUN_DIRS]
668
  dirs.append((constants.LOG_OS_DIR, 0750))
669
  for dir_name, dir_mode in dirs:
670
    if not os.path.exists(dir_name):
671
      try:
672
        os.mkdir(dir_name, dir_mode)
673
      except EnvironmentError, err:
674
        if err.errno != errno.EEXIST:
675
          print ("Node setup wrong, cannot create directory '%s': %s" %
676
                 (dir_name, err))
677
          sys.exit(5)
678
    if not os.path.isdir(dir_name):
679
      print ("Node setup wrong, '%s' is not a directory" % dir_name)
680
      sys.exit(5)
681

    
682

    
683
def main():
684
  """Main function for the node daemon.
685

    
686
  """
687
  global queue_lock
688

    
689
  options, args = ParseOptions()
690
  utils.debug = options.debug
691
  for fname in (constants.SSL_CERT_FILE,):
692
    if not os.path.isfile(fname):
693
      print "config %s not there, will not run." % fname
694
      sys.exit(5)
695

    
696
  try:
697
    port = utils.GetNodeDaemonPort()
698
  except errors.ConfigurationError, err:
699
    print "Cluster configuration incomplete: '%s'" % str(err)
700
    sys.exit(5)
701

    
702
  EnsureRuntimeEnvironment()
703

    
704
  # become a daemon
705
  if options.fork:
706
    utils.Daemonize(logfile=constants.LOG_NODESERVER)
707

    
708
  utils.WritePidFile(constants.NODED_PID)
709
  try:
710
    utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
711
                       stderr_logging=not options.fork)
712
    logging.info("ganeti node daemon startup")
713

    
714
    # Read SSL certificate
715
    ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
716
                                    ssl_cert_path=constants.SSL_CERT_FILE)
717

    
718
    # Prepare job queue
719
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
720

    
721
    mainloop = daemon.Mainloop()
722
    server = NodeHttpServer(mainloop, "", port,
723
                            ssl_params=ssl_params, ssl_verify_peer=True)
724
    server.Start()
725
    try:
726
      mainloop.Run()
727
    finally:
728
      server.Stop()
729
  finally:
730
    utils.RemovePidFile(constants.NODED_PID)
731

    
732

    
733
if __name__ == '__main__':
734
  main()