Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 04ccf5e9

History | View | Annotate | Download (20.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import SocketServer
31
import errno
32
import logging
33
import signal
34

    
35
from optparse import OptionParser
36

    
37
from ganeti import backend
38
from ganeti import constants
39
from ganeti import objects
40
from ganeti import errors
41
from ganeti import jstore
42
from ganeti import daemon
43
from ganeti import http
44
from ganeti import utils
45

    
46
import ganeti.http.server
47

    
48

    
49
queue_lock = None
50

    
51

    
52
def _RequireJobQueueLock(fn):
53
  """Decorator for job queue manipulating functions.
54

    
55
  """
56
  QUEUE_LOCK_TIMEOUT = 10
57

    
58
  def wrapper(*args, **kwargs):
59
    # Locking in exclusive, blocking mode because there could be several
60
    # children running at the same time. Waiting up to 10 seconds.
61
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
62
    try:
63
      return fn(*args, **kwargs)
64
    finally:
65
      queue_lock.Unlock()
66

    
67
  return wrapper
68

    
69

    
70
class NodeHttpServer(http.server.HttpServer):
71
  """The server implementation.
72

    
73
  This class holds all methods exposed over the RPC interface.
74

    
75
  """
76
  def __init__(self, *args, **kwargs):
77
    http.server.HttpServer.__init__(self, *args, **kwargs)
78
    self.noded_pid = os.getpid()
79

    
80
  def HandleRequest(self, req):
81
    """Handle a request.
82

    
83
    """
84
    if req.request_method.upper() != http.HTTP_PUT:
85
      raise http.HttpBadRequest()
86

    
87
    path = req.request_path
88
    if path.startswith("/"):
89
      path = path[1:]
90

    
91
    method = getattr(self, "perspective_%s" % path, None)
92
    if method is None:
93
      raise http.HttpNotFound()
94

    
95
    try:
96
      rvalue = method(req.request_body)
97
      return True, rvalue
98

    
99
    except backend.RPCFail, err:
100
      # our custom failure exception; str(err) works fine if the
101
      # exception was constructed with a single argument, and in
102
      # this case, err.message == err.args[0] == str(err)
103
      return (False, str(err))
104
    except errors.QuitGanetiException, err:
105
      # Tell parent to quit
106
      logging.info("Shutting down the node daemon, arguments: %s",
107
                   str(err.args))
108
      os.kill(self.noded_pid, signal.SIGTERM)
109
      # And return the error's arguments, which must be already in
110
      # correct tuple format
111
      return err.args
112
    except Exception, err:
113
      logging.exception("Error in RPC call")
114
      return False, "Error while executing backend function: %s" % str(err)
115

    
116
  # the new block devices  --------------------------
117

    
118
  @staticmethod
119
  def perspective_blockdev_create(params):
120
    """Create a block device.
121

    
122
    """
123
    bdev_s, size, owner, on_primary, info = params
124
    bdev = objects.Disk.FromDict(bdev_s)
125
    if bdev is None:
126
      raise ValueError("can't unserialize data!")
127
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
128

    
129
  @staticmethod
130
  def perspective_blockdev_remove(params):
131
    """Remove a block device.
132

    
133
    """
134
    bdev_s = params[0]
135
    bdev = objects.Disk.FromDict(bdev_s)
136
    return backend.BlockdevRemove(bdev)
137

    
138
  @staticmethod
139
  def perspective_blockdev_rename(params):
140
    """Remove a block device.
141

    
142
    """
143
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
144
    return backend.BlockdevRename(devlist)
145

    
146
  @staticmethod
147
  def perspective_blockdev_assemble(params):
148
    """Assemble a block device.
149

    
150
    """
151
    bdev_s, owner, on_primary = params
152
    bdev = objects.Disk.FromDict(bdev_s)
153
    if bdev is None:
154
      raise ValueError("can't unserialize data!")
155
    return backend.BlockdevAssemble(bdev, owner, on_primary)
156

    
157
  @staticmethod
158
  def perspective_blockdev_shutdown(params):
159
    """Shutdown a block device.
160

    
161
    """
162
    bdev_s = params[0]
163
    bdev = objects.Disk.FromDict(bdev_s)
164
    if bdev is None:
165
      raise ValueError("can't unserialize data!")
166
    return backend.BlockdevShutdown(bdev)
167

    
168
  @staticmethod
169
  def perspective_blockdev_addchildren(params):
170
    """Add a child to a mirror device.
171

    
172
    Note: this is only valid for mirror devices. It's the caller's duty
173
    to send a correct disk, otherwise we raise an error.
174

    
175
    """
176
    bdev_s, ndev_s = params
177
    bdev = objects.Disk.FromDict(bdev_s)
178
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
179
    if bdev is None or ndevs.count(None) > 0:
180
      raise ValueError("can't unserialize data!")
181
    return backend.BlockdevAddchildren(bdev, ndevs)
182

    
183
  @staticmethod
184
  def perspective_blockdev_removechildren(params):
185
    """Remove a child from a mirror device.
186

    
187
    This is only valid for mirror devices, of course. It's the callers
188
    duty to send a correct disk, otherwise we raise an error.
189

    
190
    """
191
    bdev_s, ndev_s = params
192
    bdev = objects.Disk.FromDict(bdev_s)
193
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
194
    if bdev is None or ndevs.count(None) > 0:
195
      raise ValueError("can't unserialize data!")
196
    return backend.BlockdevRemovechildren(bdev, ndevs)
197

    
198
  @staticmethod
199
  def perspective_blockdev_getmirrorstatus(params):
200
    """Return the mirror status for a list of disks.
201

    
202
    """
203
    disks = [objects.Disk.FromDict(dsk_s)
204
            for dsk_s in params]
205
    return backend.BlockdevGetmirrorstatus(disks)
206

    
207
  @staticmethod
208
  def perspective_blockdev_find(params):
209
    """Expose the FindBlockDevice functionality for a disk.
210

    
211
    This will try to find but not activate a disk.
212

    
213
    """
214
    disk = objects.Disk.FromDict(params[0])
215
    return backend.BlockdevFind(disk)
216

    
217
  @staticmethod
218
  def perspective_blockdev_snapshot(params):
219
    """Create a snapshot device.
220

    
221
    Note that this is only valid for LVM disks, if we get passed
222
    something else we raise an exception. The snapshot device can be
223
    remove by calling the generic block device remove call.
224

    
225
    """
226
    cfbd = objects.Disk.FromDict(params[0])
227
    return backend.BlockdevSnapshot(cfbd)
228

    
229
  @staticmethod
230
  def perspective_blockdev_grow(params):
231
    """Grow a stack of devices.
232

    
233
    """
234
    cfbd = objects.Disk.FromDict(params[0])
235
    amount = params[1]
236
    return backend.BlockdevGrow(cfbd, amount)
237

    
238
  @staticmethod
239
  def perspective_blockdev_close(params):
240
    """Closes the given block devices.
241

    
242
    """
243
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
244
    return backend.BlockdevClose(params[0], disks)
245

    
246
  # blockdev/drbd specific methods ----------
247

    
248
  @staticmethod
249
  def perspective_drbd_disconnect_net(params):
250
    """Disconnects the network connection of drbd disks.
251

    
252
    Note that this is only valid for drbd disks, so the members of the
253
    disk list must all be drbd devices.
254

    
255
    """
256
    nodes_ip, disks = params
257
    disks = [objects.Disk.FromDict(cf) for cf in disks]
258
    return backend.DrbdDisconnectNet(nodes_ip, disks)
259

    
260
  @staticmethod
261
  def perspective_drbd_attach_net(params):
262
    """Attaches the network connection of drbd disks.
263

    
264
    Note that this is only valid for drbd disks, so the members of the
265
    disk list must all be drbd devices.
266

    
267
    """
268
    nodes_ip, disks, instance_name, multimaster = params
269
    disks = [objects.Disk.FromDict(cf) for cf in disks]
270
    return backend.DrbdAttachNet(nodes_ip, disks,
271
                                     instance_name, multimaster)
272

    
273
  @staticmethod
274
  def perspective_drbd_wait_sync(params):
275
    """Wait until DRBD disks are synched.
276

    
277
    Note that this is only valid for drbd disks, so the members of the
278
    disk list must all be drbd devices.
279

    
280
    """
281
    nodes_ip, disks = params
282
    disks = [objects.Disk.FromDict(cf) for cf in disks]
283
    return backend.DrbdWaitSync(nodes_ip, disks)
284

    
285
  # export/import  --------------------------
286

    
287
  @staticmethod
288
  def perspective_snapshot_export(params):
289
    """Export a given snapshot.
290

    
291
    """
292
    disk = objects.Disk.FromDict(params[0])
293
    dest_node = params[1]
294
    instance = objects.Instance.FromDict(params[2])
295
    cluster_name = params[3]
296
    dev_idx = params[4]
297
    return backend.ExportSnapshot(disk, dest_node, instance,
298
                                  cluster_name, dev_idx)
299

    
300
  @staticmethod
301
  def perspective_finalize_export(params):
302
    """Expose the finalize export functionality.
303

    
304
    """
305
    instance = objects.Instance.FromDict(params[0])
306
    snap_disks = [objects.Disk.FromDict(str_data)
307
                  for str_data in params[1]]
308
    return backend.FinalizeExport(instance, snap_disks)
309

    
310
  @staticmethod
311
  def perspective_export_info(params):
312
    """Query information about an existing export on this node.
313

    
314
    The given path may not contain an export, in which case we return
315
    None.
316

    
317
    """
318
    path = params[0]
319
    return backend.ExportInfo(path)
320

    
321
  @staticmethod
322
  def perspective_export_list(params):
323
    """List the available exports on this node.
324

    
325
    Note that as opposed to export_info, which may query data about an
326
    export in any path, this only queries the standard Ganeti path
327
    (constants.EXPORT_DIR).
328

    
329
    """
330
    return backend.ListExports()
331

    
332
  @staticmethod
333
  def perspective_export_remove(params):
334
    """Remove an export.
335

    
336
    """
337
    export = params[0]
338
    return backend.RemoveExport(export)
339

    
340
  # volume  --------------------------
341

    
342
  @staticmethod
343
  def perspective_lv_list(params):
344
    """Query the list of logical volumes in a given volume group.
345

    
346
    """
347
    vgname = params[0]
348
    return backend.GetVolumeList(vgname)
349

    
350
  @staticmethod
351
  def perspective_vg_list(params):
352
    """Query the list of volume groups.
353

    
354
    """
355
    return backend.ListVolumeGroups()
356

    
357
  # bridge  --------------------------
358

    
359
  @staticmethod
360
  def perspective_bridges_exist(params):
361
    """Check if all bridges given exist on this node.
362

    
363
    """
364
    bridges_list = params[0]
365
    return backend.BridgesExist(bridges_list)
366

    
367
  # instance  --------------------------
368

    
369
  @staticmethod
370
  def perspective_instance_os_add(params):
371
    """Install an OS on a given instance.
372

    
373
    """
374
    inst_s = params[0]
375
    inst = objects.Instance.FromDict(inst_s)
376
    reinstall = params[1]
377
    return backend.InstanceOsAdd(inst, reinstall)
378

    
379
  @staticmethod
380
  def perspective_instance_run_rename(params):
381
    """Runs the OS rename script for an instance.
382

    
383
    """
384
    inst_s, old_name = params
385
    inst = objects.Instance.FromDict(inst_s)
386
    return backend.RunRenameInstance(inst, old_name)
387

    
388
  @staticmethod
389
  def perspective_instance_os_import(params):
390
    """Run the import function of an OS onto a given instance.
391

    
392
    """
393
    inst_s, src_node, src_images, cluster_name = params
394
    inst = objects.Instance.FromDict(inst_s)
395
    return backend.ImportOSIntoInstance(inst, src_node, src_images,
396
                                        cluster_name)
397

    
398
  @staticmethod
399
  def perspective_instance_shutdown(params):
400
    """Shutdown an instance.
401

    
402
    """
403
    instance = objects.Instance.FromDict(params[0])
404
    return backend.InstanceShutdown(instance)
405

    
406
  @staticmethod
407
  def perspective_instance_start(params):
408
    """Start an instance.
409

    
410
    """
411
    instance = objects.Instance.FromDict(params[0])
412
    return backend.StartInstance(instance)
413

    
414
  @staticmethod
415
  def perspective_migration_info(params):
416
    """Gather information about an instance to be migrated.
417

    
418
    """
419
    instance = objects.Instance.FromDict(params[0])
420
    return backend.MigrationInfo(instance)
421

    
422
  @staticmethod
423
  def perspective_accept_instance(params):
424
    """Prepare the node to accept an instance.
425

    
426
    """
427
    instance, info, target = params
428
    instance = objects.Instance.FromDict(instance)
429
    return backend.AcceptInstance(instance, info, target)
430

    
431
  @staticmethod
432
  def perspective_finalize_migration(params):
433
    """Finalize the instance migration.
434

    
435
    """
436
    instance, info, success = params
437
    instance = objects.Instance.FromDict(instance)
438
    return backend.FinalizeMigration(instance, info, success)
439

    
440
  @staticmethod
441
  def perspective_instance_migrate(params):
442
    """Migrates an instance.
443

    
444
    """
445
    instance, target, live = params
446
    instance = objects.Instance.FromDict(instance)
447
    return backend.MigrateInstance(instance, target, live)
448

    
449
  @staticmethod
450
  def perspective_instance_reboot(params):
451
    """Reboot an instance.
452

    
453
    """
454
    instance = objects.Instance.FromDict(params[0])
455
    reboot_type = params[1]
456
    return backend.InstanceReboot(instance, reboot_type)
457

    
458
  @staticmethod
459
  def perspective_instance_info(params):
460
    """Query instance information.
461

    
462
    """
463
    return backend.GetInstanceInfo(params[0], params[1])
464

    
465
  @staticmethod
466
  def perspective_instance_migratable(params):
467
    """Query whether the specified instance can be migrated.
468

    
469
    """
470
    instance = objects.Instance.FromDict(params[0])
471
    return backend.GetInstanceMigratable(instance)
472

    
473
  @staticmethod
474
  def perspective_all_instances_info(params):
475
    """Query information about all instances.
476

    
477
    """
478
    return backend.GetAllInstancesInfo(params[0])
479

    
480
  @staticmethod
481
  def perspective_instance_list(params):
482
    """Query the list of running instances.
483

    
484
    """
485
    return backend.GetInstanceList(params[0])
486

    
487
  # node --------------------------
488

    
489
  @staticmethod
490
  def perspective_node_tcp_ping(params):
491
    """Do a TcpPing on the remote node.
492

    
493
    """
494
    return utils.TcpPing(params[1], params[2], timeout=params[3],
495
                         live_port_needed=params[4], source=params[0])
496

    
497
  @staticmethod
498
  def perspective_node_has_ip_address(params):
499
    """Checks if a node has the given ip address.
500

    
501
    """
502
    return utils.OwnIpAddress(params[0])
503

    
504
  @staticmethod
505
  def perspective_node_info(params):
506
    """Query node information.
507

    
508
    """
509
    vgname, hypervisor_type = params
510
    return backend.GetNodeInfo(vgname, hypervisor_type)
511

    
512
  @staticmethod
513
  def perspective_node_add(params):
514
    """Complete the registration of this node in the cluster.
515

    
516
    """
517
    return backend.AddNode(params[0], params[1], params[2],
518
                           params[3], params[4], params[5])
519

    
520
  @staticmethod
521
  def perspective_node_verify(params):
522
    """Run a verify sequence on this node.
523

    
524
    """
525
    return backend.VerifyNode(params[0], params[1])
526

    
527
  @staticmethod
528
  def perspective_node_start_master(params):
529
    """Promote this node to master status.
530

    
531
    """
532
    return backend.StartMaster(params[0], params[1])
533

    
534
  @staticmethod
535
  def perspective_node_stop_master(params):
536
    """Demote this node from master status.
537

    
538
    """
539
    return backend.StopMaster(params[0])
540

    
541
  @staticmethod
542
  def perspective_node_leave_cluster(params):
543
    """Cleanup after leaving a cluster.
544

    
545
    """
546
    return backend.LeaveCluster()
547

    
548
  @staticmethod
549
  def perspective_node_volumes(params):
550
    """Query the list of all logical volume groups.
551

    
552
    """
553
    return backend.NodeVolumes()
554

    
555
  @staticmethod
556
  def perspective_node_demote_from_mc(params):
557
    """Demote a node from the master candidate role.
558

    
559
    """
560
    return backend.DemoteFromMC()
561

    
562

    
563
  @staticmethod
564
  def perspective_node_powercycle(params):
565
    """Tries to powercycle the nod.
566

    
567
    """
568
    hypervisor_type = params[0]
569
    return backend.PowercycleNode(hypervisor_type)
570

    
571

    
572
  # cluster --------------------------
573

    
574
  @staticmethod
575
  def perspective_version(params):
576
    """Query version information.
577

    
578
    """
579
    return constants.PROTOCOL_VERSION
580

    
581
  @staticmethod
582
  def perspective_upload_file(params):
583
    """Upload a file.
584

    
585
    Note that the backend implementation imposes strict rules on which
586
    files are accepted.
587

    
588
    """
589
    return backend.UploadFile(*params)
590

    
591
  @staticmethod
592
  def perspective_master_info(params):
593
    """Query master information.
594

    
595
    """
596
    return backend.GetMasterInfo()
597

    
598
  @staticmethod
599
  def perspective_write_ssconf_files(params):
600
    """Write ssconf files.
601

    
602
    """
603
    (values,) = params
604
    return backend.WriteSsconfFiles(values)
605

    
606
  # os -----------------------
607

    
608
  @staticmethod
609
  def perspective_os_diagnose(params):
610
    """Query detailed information about existing OSes.
611

    
612
    """
613
    return backend.DiagnoseOS()
614

    
615
  @staticmethod
616
  def perspective_os_get(params):
617
    """Query information about a given OS.
618

    
619
    """
620
    name = params[0]
621
    os_obj = backend.OSFromDisk(name)
622
    return os_obj.ToDict()
623

    
624
  # hooks -----------------------
625

    
626
  @staticmethod
627
  def perspective_hooks_runner(params):
628
    """Run hook scripts.
629

    
630
    """
631
    hpath, phase, env = params
632
    hr = backend.HooksRunner()
633
    return hr.RunHooks(hpath, phase, env)
634

    
635
  # iallocator -----------------
636

    
637
  @staticmethod
638
  def perspective_iallocator_runner(params):
639
    """Run an iallocator script.
640

    
641
    """
642
    name, idata = params
643
    iar = backend.IAllocatorRunner()
644
    return iar.Run(name, idata)
645

    
646
  # test -----------------------
647

    
648
  @staticmethod
649
  def perspective_test_delay(params):
650
    """Run test delay.
651

    
652
    """
653
    duration = params[0]
654
    status, rval = utils.TestDelay(duration)
655
    if not status:
656
      raise backend.RPCFail(rval)
657
    return rval
658

    
659
  # file storage ---------------
660

    
661
  @staticmethod
662
  def perspective_file_storage_dir_create(params):
663
    """Create the file storage directory.
664

    
665
    """
666
    file_storage_dir = params[0]
667
    return backend.CreateFileStorageDir(file_storage_dir)
668

    
669
  @staticmethod
670
  def perspective_file_storage_dir_remove(params):
671
    """Remove the file storage directory.
672

    
673
    """
674
    file_storage_dir = params[0]
675
    return backend.RemoveFileStorageDir(file_storage_dir)
676

    
677
  @staticmethod
678
  def perspective_file_storage_dir_rename(params):
679
    """Rename the file storage directory.
680

    
681
    """
682
    old_file_storage_dir = params[0]
683
    new_file_storage_dir = params[1]
684
    return backend.RenameFileStorageDir(old_file_storage_dir,
685
                                        new_file_storage_dir)
686

    
687
  # jobs ------------------------
688

    
689
  @staticmethod
690
  @_RequireJobQueueLock
691
  def perspective_jobqueue_update(params):
692
    """Update job queue.
693

    
694
    """
695
    (file_name, content) = params
696
    return backend.JobQueueUpdate(file_name, content)
697

    
698
  @staticmethod
699
  @_RequireJobQueueLock
700
  def perspective_jobqueue_purge(params):
701
    """Purge job queue.
702

    
703
    """
704
    return backend.JobQueuePurge()
705

    
706
  @staticmethod
707
  @_RequireJobQueueLock
708
  def perspective_jobqueue_rename(params):
709
    """Rename a job queue file.
710

    
711
    """
712
    # TODO: What if a file fails to rename?
713
    return [backend.JobQueueRename(old, new) for old, new in params]
714

    
715
  @staticmethod
716
  def perspective_jobqueue_set_drain(params):
717
    """Set/unset the queue drain flag.
718

    
719
    """
720
    drain_flag = params[0]
721
    return backend.JobQueueSetDrainFlag(drain_flag)
722

    
723

    
724
  # hypervisor ---------------
725

    
726
  @staticmethod
727
  def perspective_hypervisor_validate_params(params):
728
    """Validate the hypervisor parameters.
729

    
730
    """
731
    (hvname, hvparams) = params
732
    return backend.ValidateHVParams(hvname, hvparams)
733

    
734

    
735
def CheckNODED(options, args):
736
  """Initial checks whether to run exit with a failure
737

    
738
  """
739
  for fname in (constants.SSL_CERT_FILE,):
740
    if not os.path.isfile(fname):
741
      print "config %s not there, will not run." % fname
742
      sys.exit(constants.EXIT_NOTCLUSTER)
743

    
744

    
745
def ExecNODED(options, args):
746
  """Main NODED function, executed with the pidfile held.
747

    
748
  """
749
  global queue_lock
750

    
751
  # Read SSL certificate
752
  ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
753
                                  ssl_cert_path=constants.SSL_CERT_FILE)
754

    
755
  # Prepare job queue
756
  queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
757

    
758
  mainloop = daemon.Mainloop()
759
  server = NodeHttpServer(mainloop, options.bind_address, options.port,
760
                          ssl_params=ssl_params, ssl_verify_peer=True)
761
  server.Start()
762
  try:
763
    mainloop.Run()
764
  finally:
765
    server.Stop()
766

    
767

    
768
def main():
769
  """Main function for the node daemon.
770

    
771
  """
772
  parser = OptionParser(description="Ganeti node daemon",
773
                        usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
774
                        version="%%prog (ganeti) %s" %
775
                        constants.RELEASE_VERSION)
776
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
777
  dirs.append((constants.LOG_OS_DIR, 0750))
778
  dirs.append((constants.LOCK_DIR, 1777))
779
  daemon.GenericMain(constants.NODED, parser, dirs, CheckNODED, ExecNODED)
780

    
781

    
782
if __name__ == '__main__':
783
  main()