Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ f942a838

History | View | Annotate | Download (23.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# pylint: disable-msg=C0103,W0142
25

    
26
# C0103: Functions in this module need to have a given name structure,
27
# and the name of the daemon doesn't match
28

    
29
# W0142: Used * or ** magic, since we do use it extensively in this
30
# module
31

    
32
import os
33
import sys
34
import logging
35
import signal
36

    
37
from optparse import OptionParser
38

    
39
from ganeti import backend
40
from ganeti import constants
41
from ganeti import objects
42
from ganeti import errors
43
from ganeti import jstore
44
from ganeti import daemon
45
from ganeti import http
46
from ganeti import utils
47
from ganeti import storage
48
from ganeti import serializer
49

    
50
import ganeti.http.server # pylint: disable-msg=W0611
51

    
52

    
53
queue_lock = None
54

    
55

    
56
def _PrepareQueueLock():
57
  """Try to prepare the queue lock.
58

    
59
  @return: None for success, otherwise an exception object
60

    
61
  """
62
  global queue_lock # pylint: disable-msg=W0603
63

    
64
  if queue_lock is not None:
65
    return None
66

    
67
  # Prepare job queue
68
  try:
69
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
70
    return None
71
  except EnvironmentError, err:
72
    return err
73

    
74

    
75
def _RequireJobQueueLock(fn):
76
  """Decorator for job queue manipulating functions.
77

    
78
  """
79
  QUEUE_LOCK_TIMEOUT = 10
80

    
81
  def wrapper(*args, **kwargs):
82
    # Locking in exclusive, blocking mode because there could be several
83
    # children running at the same time. Waiting up to 10 seconds.
84
    if _PrepareQueueLock() is not None:
85
      raise errors.JobQueueError("Job queue failed initialization,"
86
                                 " cannot update jobs")
87
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
88
    try:
89
      return fn(*args, **kwargs)
90
    finally:
91
      queue_lock.Unlock()
92

    
93
  return wrapper
94

    
95

    
96
class NodeHttpServer(http.server.HttpServer):
97
  """The server implementation.
98

    
99
  This class holds all methods exposed over the RPC interface.
100

    
101
  """
102
  # too many public methods, and unused args - all methods get params
103
  # due to the API
104
  # pylint: disable-msg=R0904,W0613
105
  def __init__(self, *args, **kwargs):
106
    http.server.HttpServer.__init__(self, *args, **kwargs)
107
    self.noded_pid = os.getpid()
108

    
109
  def HandleRequest(self, req):
110
    """Handle a request.
111

    
112
    """
113
    if req.request_method.upper() != http.HTTP_PUT:
114
      raise http.HttpBadRequest()
115

    
116
    path = req.request_path
117
    if path.startswith("/"):
118
      path = path[1:]
119

    
120
    method = getattr(self, "perspective_%s" % path, None)
121
    if method is None:
122
      raise http.HttpNotFound()
123

    
124
    try:
125
      result = (True, method(serializer.LoadJson(req.request_body)))
126

    
127
    except backend.RPCFail, err:
128
      # our custom failure exception; str(err) works fine if the
129
      # exception was constructed with a single argument, and in
130
      # this case, err.message == err.args[0] == str(err)
131
      result = (False, str(err))
132
    except errors.QuitGanetiException, err:
133
      # Tell parent to quit
134
      logging.info("Shutting down the node daemon, arguments: %s",
135
                   str(err.args))
136
      os.kill(self.noded_pid, signal.SIGTERM)
137
      # And return the error's arguments, which must be already in
138
      # correct tuple format
139
      result = err.args
140
    except Exception, err:
141
      logging.exception("Error in RPC call")
142
      result = (False, "Error while executing backend function: %s" % str(err))
143

    
144
    return serializer.DumpJson(result, indent=False)
145

    
146
  # the new block devices  --------------------------
147

    
148
  @staticmethod
149
  def perspective_blockdev_create(params):
150
    """Create a block device.
151

    
152
    """
153
    bdev_s, size, owner, on_primary, info = params
154
    bdev = objects.Disk.FromDict(bdev_s)
155
    if bdev is None:
156
      raise ValueError("can't unserialize data!")
157
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
158

    
159
  @staticmethod
160
  def perspective_blockdev_remove(params):
161
    """Remove a block device.
162

    
163
    """
164
    bdev_s = params[0]
165
    bdev = objects.Disk.FromDict(bdev_s)
166
    return backend.BlockdevRemove(bdev)
167

    
168
  @staticmethod
169
  def perspective_blockdev_rename(params):
170
    """Remove a block device.
171

    
172
    """
173
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
174
    return backend.BlockdevRename(devlist)
175

    
176
  @staticmethod
177
  def perspective_blockdev_assemble(params):
178
    """Assemble a block device.
179

    
180
    """
181
    bdev_s, owner, on_primary = params
182
    bdev = objects.Disk.FromDict(bdev_s)
183
    if bdev is None:
184
      raise ValueError("can't unserialize data!")
185
    return backend.BlockdevAssemble(bdev, owner, on_primary)
186

    
187
  @staticmethod
188
  def perspective_blockdev_shutdown(params):
189
    """Shutdown a block device.
190

    
191
    """
192
    bdev_s = params[0]
193
    bdev = objects.Disk.FromDict(bdev_s)
194
    if bdev is None:
195
      raise ValueError("can't unserialize data!")
196
    return backend.BlockdevShutdown(bdev)
197

    
198
  @staticmethod
199
  def perspective_blockdev_addchildren(params):
200
    """Add a child to a mirror device.
201

    
202
    Note: this is only valid for mirror devices. It's the caller's duty
203
    to send a correct disk, otherwise we raise an error.
204

    
205
    """
206
    bdev_s, ndev_s = params
207
    bdev = objects.Disk.FromDict(bdev_s)
208
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
209
    if bdev is None or ndevs.count(None) > 0:
210
      raise ValueError("can't unserialize data!")
211
    return backend.BlockdevAddchildren(bdev, ndevs)
212

    
213
  @staticmethod
214
  def perspective_blockdev_removechildren(params):
215
    """Remove a child from a mirror device.
216

    
217
    This is only valid for mirror devices, of course. It's the callers
218
    duty to send a correct disk, otherwise we raise an error.
219

    
220
    """
221
    bdev_s, ndev_s = params
222
    bdev = objects.Disk.FromDict(bdev_s)
223
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
224
    if bdev is None or ndevs.count(None) > 0:
225
      raise ValueError("can't unserialize data!")
226
    return backend.BlockdevRemovechildren(bdev, ndevs)
227

    
228
  @staticmethod
229
  def perspective_blockdev_getmirrorstatus(params):
230
    """Return the mirror status for a list of disks.
231

    
232
    """
233
    disks = [objects.Disk.FromDict(dsk_s)
234
             for dsk_s in params]
235
    return [status.ToDict()
236
            for status in backend.BlockdevGetmirrorstatus(disks)]
237

    
238
  @staticmethod
239
  def perspective_blockdev_find(params):
240
    """Expose the FindBlockDevice functionality for a disk.
241

    
242
    This will try to find but not activate a disk.
243

    
244
    """
245
    disk = objects.Disk.FromDict(params[0])
246

    
247
    result = backend.BlockdevFind(disk)
248
    if result is None:
249
      return None
250

    
251
    return result.ToDict()
252

    
253
  @staticmethod
254
  def perspective_blockdev_snapshot(params):
255
    """Create a snapshot device.
256

    
257
    Note that this is only valid for LVM disks, if we get passed
258
    something else we raise an exception. The snapshot device can be
259
    remove by calling the generic block device remove call.
260

    
261
    """
262
    cfbd = objects.Disk.FromDict(params[0])
263
    return backend.BlockdevSnapshot(cfbd)
264

    
265
  @staticmethod
266
  def perspective_blockdev_grow(params):
267
    """Grow a stack of devices.
268

    
269
    """
270
    cfbd = objects.Disk.FromDict(params[0])
271
    amount = params[1]
272
    return backend.BlockdevGrow(cfbd, amount)
273

    
274
  @staticmethod
275
  def perspective_blockdev_close(params):
276
    """Closes the given block devices.
277

    
278
    """
279
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
280
    return backend.BlockdevClose(params[0], disks)
281

    
282
  @staticmethod
283
  def perspective_blockdev_getsize(params):
284
    """Compute the sizes of the given block devices.
285

    
286
    """
287
    disks = [objects.Disk.FromDict(cf) for cf in params[0]]
288
    return backend.BlockdevGetsize(disks)
289

    
290
  @staticmethod
291
  def perspective_blockdev_export(params):
292
    """Compute the sizes of the given block devices.
293

    
294
    """
295
    disk = objects.Disk.FromDict(params[0])
296
    dest_node, dest_path, cluster_name = params[1:]
297
    return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
298

    
299
  # blockdev/drbd specific methods ----------
300

    
301
  @staticmethod
302
  def perspective_drbd_disconnect_net(params):
303
    """Disconnects the network connection of drbd disks.
304

    
305
    Note that this is only valid for drbd disks, so the members of the
306
    disk list must all be drbd devices.
307

    
308
    """
309
    nodes_ip, disks = params
310
    disks = [objects.Disk.FromDict(cf) for cf in disks]
311
    return backend.DrbdDisconnectNet(nodes_ip, disks)
312

    
313
  @staticmethod
314
  def perspective_drbd_attach_net(params):
315
    """Attaches the network connection of drbd disks.
316

    
317
    Note that this is only valid for drbd disks, so the members of the
318
    disk list must all be drbd devices.
319

    
320
    """
321
    nodes_ip, disks, instance_name, multimaster = params
322
    disks = [objects.Disk.FromDict(cf) for cf in disks]
323
    return backend.DrbdAttachNet(nodes_ip, disks,
324
                                     instance_name, multimaster)
325

    
326
  @staticmethod
327
  def perspective_drbd_wait_sync(params):
328
    """Wait until DRBD disks are synched.
329

    
330
    Note that this is only valid for drbd disks, so the members of the
331
    disk list must all be drbd devices.
332

    
333
    """
334
    nodes_ip, disks = params
335
    disks = [objects.Disk.FromDict(cf) for cf in disks]
336
    return backend.DrbdWaitSync(nodes_ip, disks)
337

    
338
  # export/import  --------------------------
339

    
340
  @staticmethod
341
  def perspective_snapshot_export(params):
342
    """Export a given snapshot.
343

    
344
    """
345
    disk = objects.Disk.FromDict(params[0])
346
    dest_node = params[1]
347
    instance = objects.Instance.FromDict(params[2])
348
    cluster_name = params[3]
349
    dev_idx = params[4]
350
    debug = params[5]
351
    return backend.ExportSnapshot(disk, dest_node, instance,
352
                                  cluster_name, dev_idx, debug)
353

    
354
  @staticmethod
355
  def perspective_finalize_export(params):
356
    """Expose the finalize export functionality.
357

    
358
    """
359
    instance = objects.Instance.FromDict(params[0])
360

    
361
    snap_disks = []
362
    for disk in params[1]:
363
      if isinstance(disk, bool):
364
        snap_disks.append(disk)
365
      else:
366
        snap_disks.append(objects.Disk.FromDict(disk))
367

    
368
    return backend.FinalizeExport(instance, snap_disks)
369

    
370
  @staticmethod
371
  def perspective_export_info(params):
372
    """Query information about an existing export on this node.
373

    
374
    The given path may not contain an export, in which case we return
375
    None.
376

    
377
    """
378
    path = params[0]
379
    return backend.ExportInfo(path)
380

    
381
  @staticmethod
382
  def perspective_export_list(params):
383
    """List the available exports on this node.
384

    
385
    Note that as opposed to export_info, which may query data about an
386
    export in any path, this only queries the standard Ganeti path
387
    (constants.EXPORT_DIR).
388

    
389
    """
390
    return backend.ListExports()
391

    
392
  @staticmethod
393
  def perspective_export_remove(params):
394
    """Remove an export.
395

    
396
    """
397
    export = params[0]
398
    return backend.RemoveExport(export)
399

    
400
  # volume  --------------------------
401

    
402
  @staticmethod
403
  def perspective_lv_list(params):
404
    """Query the list of logical volumes in a given volume group.
405

    
406
    """
407
    vgname = params[0]
408
    return backend.GetVolumeList(vgname)
409

    
410
  @staticmethod
411
  def perspective_vg_list(params):
412
    """Query the list of volume groups.
413

    
414
    """
415
    return backend.ListVolumeGroups()
416

    
417
  # Storage --------------------------
418

    
419
  @staticmethod
420
  def perspective_storage_list(params):
421
    """Get list of storage units.
422

    
423
    """
424
    (su_name, su_args, name, fields) = params
425
    return storage.GetStorage(su_name, *su_args).List(name, fields)
426

    
427
  @staticmethod
428
  def perspective_storage_modify(params):
429
    """Modify a storage unit.
430

    
431
    """
432
    (su_name, su_args, name, changes) = params
433
    return storage.GetStorage(su_name, *su_args).Modify(name, changes)
434

    
435
  @staticmethod
436
  def perspective_storage_execute(params):
437
    """Execute an operation on a storage unit.
438

    
439
    """
440
    (su_name, su_args, name, op) = params
441
    return storage.GetStorage(su_name, *su_args).Execute(name, op)
442

    
443
  # bridge  --------------------------
444

    
445
  @staticmethod
446
  def perspective_bridges_exist(params):
447
    """Check if all bridges given exist on this node.
448

    
449
    """
450
    bridges_list = params[0]
451
    return backend.BridgesExist(bridges_list)
452

    
453
  # instance  --------------------------
454

    
455
  @staticmethod
456
  def perspective_instance_os_add(params):
457
    """Install an OS on a given instance.
458

    
459
    """
460
    inst_s = params[0]
461
    inst = objects.Instance.FromDict(inst_s)
462
    reinstall = params[1]
463
    debug = params[2]
464
    return backend.InstanceOsAdd(inst, reinstall, debug)
465

    
466
  @staticmethod
467
  def perspective_instance_run_rename(params):
468
    """Runs the OS rename script for an instance.
469

    
470
    """
471
    inst_s, old_name, debug = params
472
    inst = objects.Instance.FromDict(inst_s)
473
    return backend.RunRenameInstance(inst, old_name, debug)
474

    
475
  @staticmethod
476
  def perspective_instance_os_import(params):
477
    """Run the import function of an OS onto a given instance.
478

    
479
    """
480
    inst_s, src_node, src_images, cluster_name, debug = params
481
    inst = objects.Instance.FromDict(inst_s)
482
    return backend.ImportOSIntoInstance(inst, src_node, src_images,
483
                                        cluster_name, debug)
484

    
485
  @staticmethod
486
  def perspective_instance_shutdown(params):
487
    """Shutdown an instance.
488

    
489
    """
490
    instance = objects.Instance.FromDict(params[0])
491
    timeout = params[1]
492
    return backend.InstanceShutdown(instance, timeout)
493

    
494
  @staticmethod
495
  def perspective_instance_start(params):
496
    """Start an instance.
497

    
498
    """
499
    instance = objects.Instance.FromDict(params[0])
500
    return backend.StartInstance(instance)
501

    
502
  @staticmethod
503
  def perspective_migration_info(params):
504
    """Gather information about an instance to be migrated.
505

    
506
    """
507
    instance = objects.Instance.FromDict(params[0])
508
    return backend.MigrationInfo(instance)
509

    
510
  @staticmethod
511
  def perspective_accept_instance(params):
512
    """Prepare the node to accept an instance.
513

    
514
    """
515
    instance, info, target = params
516
    instance = objects.Instance.FromDict(instance)
517
    return backend.AcceptInstance(instance, info, target)
518

    
519
  @staticmethod
520
  def perspective_finalize_migration(params):
521
    """Finalize the instance migration.
522

    
523
    """
524
    instance, info, success = params
525
    instance = objects.Instance.FromDict(instance)
526
    return backend.FinalizeMigration(instance, info, success)
527

    
528
  @staticmethod
529
  def perspective_instance_migrate(params):
530
    """Migrates an instance.
531

    
532
    """
533
    instance, target, live = params
534
    instance = objects.Instance.FromDict(instance)
535
    return backend.MigrateInstance(instance, target, live)
536

    
537
  @staticmethod
538
  def perspective_instance_reboot(params):
539
    """Reboot an instance.
540

    
541
    """
542
    instance = objects.Instance.FromDict(params[0])
543
    reboot_type = params[1]
544
    shutdown_timeout = params[2]
545
    return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
546

    
547
  @staticmethod
548
  def perspective_instance_info(params):
549
    """Query instance information.
550

    
551
    """
552
    return backend.GetInstanceInfo(params[0], params[1])
553

    
554
  @staticmethod
555
  def perspective_instance_migratable(params):
556
    """Query whether the specified instance can be migrated.
557

    
558
    """
559
    instance = objects.Instance.FromDict(params[0])
560
    return backend.GetInstanceMigratable(instance)
561

    
562
  @staticmethod
563
  def perspective_all_instances_info(params):
564
    """Query information about all instances.
565

    
566
    """
567
    return backend.GetAllInstancesInfo(params[0])
568

    
569
  @staticmethod
570
  def perspective_instance_list(params):
571
    """Query the list of running instances.
572

    
573
    """
574
    return backend.GetInstanceList(params[0])
575

    
576
  # node --------------------------
577

    
578
  @staticmethod
579
  def perspective_node_tcp_ping(params):
580
    """Do a TcpPing on the remote node.
581

    
582
    """
583
    return utils.TcpPing(params[1], params[2], timeout=params[3],
584
                         live_port_needed=params[4], source=params[0])
585

    
586
  @staticmethod
587
  def perspective_node_has_ip_address(params):
588
    """Checks if a node has the given ip address.
589

    
590
    """
591
    return utils.OwnIpAddress(params[0])
592

    
593
  @staticmethod
594
  def perspective_node_info(params):
595
    """Query node information.
596

    
597
    """
598
    vgname, hypervisor_type = params
599
    return backend.GetNodeInfo(vgname, hypervisor_type)
600

    
601
  @staticmethod
602
  def perspective_node_add(params):
603
    """Complete the registration of this node in the cluster.
604

    
605
    """
606
    return backend.AddNode(params[0], params[1], params[2],
607
                           params[3], params[4], params[5])
608

    
609
  @staticmethod
610
  def perspective_node_verify(params):
611
    """Run a verify sequence on this node.
612

    
613
    """
614
    return backend.VerifyNode(params[0], params[1])
615

    
616
  @staticmethod
617
  def perspective_node_start_master(params):
618
    """Promote this node to master status.
619

    
620
    """
621
    return backend.StartMaster(params[0], params[1])
622

    
623
  @staticmethod
624
  def perspective_node_stop_master(params):
625
    """Demote this node from master status.
626

    
627
    """
628
    return backend.StopMaster(params[0])
629

    
630
  @staticmethod
631
  def perspective_node_leave_cluster(params):
632
    """Cleanup after leaving a cluster.
633

    
634
    """
635
    return backend.LeaveCluster(params[0])
636

    
637
  @staticmethod
638
  def perspective_node_volumes(params):
639
    """Query the list of all logical volume groups.
640

    
641
    """
642
    return backend.NodeVolumes()
643

    
644
  @staticmethod
645
  def perspective_node_demote_from_mc(params):
646
    """Demote a node from the master candidate role.
647

    
648
    """
649
    return backend.DemoteFromMC()
650

    
651

    
652
  @staticmethod
653
  def perspective_node_powercycle(params):
654
    """Tries to powercycle the nod.
655

    
656
    """
657
    hypervisor_type = params[0]
658
    return backend.PowercycleNode(hypervisor_type)
659

    
660

    
661
  # cluster --------------------------
662

    
663
  @staticmethod
664
  def perspective_version(params):
665
    """Query version information.
666

    
667
    """
668
    return constants.PROTOCOL_VERSION
669

    
670
  @staticmethod
671
  def perspective_upload_file(params):
672
    """Upload a file.
673

    
674
    Note that the backend implementation imposes strict rules on which
675
    files are accepted.
676

    
677
    """
678
    return backend.UploadFile(*params)
679

    
680
  @staticmethod
681
  def perspective_master_info(params):
682
    """Query master information.
683

    
684
    """
685
    return backend.GetMasterInfo()
686

    
687
  @staticmethod
688
  def perspective_write_ssconf_files(params):
689
    """Write ssconf files.
690

    
691
    """
692
    (values,) = params
693
    return backend.WriteSsconfFiles(values)
694

    
695
  # os -----------------------
696

    
697
  @staticmethod
698
  def perspective_os_diagnose(params):
699
    """Query detailed information about existing OSes.
700

    
701
    """
702
    return backend.DiagnoseOS()
703

    
704
  @staticmethod
705
  def perspective_os_get(params):
706
    """Query information about a given OS.
707

    
708
    """
709
    name = params[0]
710
    os_obj = backend.OSFromDisk(name)
711
    return os_obj.ToDict()
712

    
713
  # hooks -----------------------
714

    
715
  @staticmethod
716
  def perspective_hooks_runner(params):
717
    """Run hook scripts.
718

    
719
    """
720
    hpath, phase, env = params
721
    hr = backend.HooksRunner()
722
    return hr.RunHooks(hpath, phase, env)
723

    
724
  # iallocator -----------------
725

    
726
  @staticmethod
727
  def perspective_iallocator_runner(params):
728
    """Run an iallocator script.
729

    
730
    """
731
    name, idata = params
732
    iar = backend.IAllocatorRunner()
733
    return iar.Run(name, idata)
734

    
735
  # test -----------------------
736

    
737
  @staticmethod
738
  def perspective_test_delay(params):
739
    """Run test delay.
740

    
741
    """
742
    duration = params[0]
743
    status, rval = utils.TestDelay(duration)
744
    if not status:
745
      raise backend.RPCFail(rval)
746
    return rval
747

    
748
  # file storage ---------------
749

    
750
  @staticmethod
751
  def perspective_file_storage_dir_create(params):
752
    """Create the file storage directory.
753

    
754
    """
755
    file_storage_dir = params[0]
756
    return backend.CreateFileStorageDir(file_storage_dir)
757

    
758
  @staticmethod
759
  def perspective_file_storage_dir_remove(params):
760
    """Remove the file storage directory.
761

    
762
    """
763
    file_storage_dir = params[0]
764
    return backend.RemoveFileStorageDir(file_storage_dir)
765

    
766
  @staticmethod
767
  def perspective_file_storage_dir_rename(params):
768
    """Rename the file storage directory.
769

    
770
    """
771
    old_file_storage_dir = params[0]
772
    new_file_storage_dir = params[1]
773
    return backend.RenameFileStorageDir(old_file_storage_dir,
774
                                        new_file_storage_dir)
775

    
776
  # jobs ------------------------
777

    
778
  @staticmethod
779
  @_RequireJobQueueLock
780
  def perspective_jobqueue_update(params):
781
    """Update job queue.
782

    
783
    """
784
    (file_name, content) = params
785
    return backend.JobQueueUpdate(file_name, content)
786

    
787
  @staticmethod
788
  @_RequireJobQueueLock
789
  def perspective_jobqueue_purge(params):
790
    """Purge job queue.
791

    
792
    """
793
    return backend.JobQueuePurge()
794

    
795
  @staticmethod
796
  @_RequireJobQueueLock
797
  def perspective_jobqueue_rename(params):
798
    """Rename a job queue file.
799

    
800
    """
801
    # TODO: What if a file fails to rename?
802
    return [backend.JobQueueRename(old, new) for old, new in params]
803

    
804
  @staticmethod
805
  def perspective_jobqueue_set_drain(params):
806
    """Set/unset the queue drain flag.
807

    
808
    """
809
    drain_flag = params[0]
810
    return backend.JobQueueSetDrainFlag(drain_flag)
811

    
812

    
813
  # hypervisor ---------------
814

    
815
  @staticmethod
816
  def perspective_hypervisor_validate_params(params):
817
    """Validate the hypervisor parameters.
818

    
819
    """
820
    (hvname, hvparams) = params
821
    return backend.ValidateHVParams(hvname, hvparams)
822

    
823
  # Crypto
824

    
825
  @staticmethod
826
  def perspective_create_x509_certificate(params):
827
    """Creates a new X509 certificate for SSL/TLS.
828

    
829
    """
830
    (validity, ) = params
831
    return backend.CreateX509Certificate(validity)
832

    
833
  @staticmethod
834
  def perspective_remove_x509_certificate(params):
835
    """Removes a X509 certificate.
836

    
837
    """
838
    (name, ) = params
839
    return backend.RemoveX509Certificate(name)
840

    
841

    
842
def CheckNoded(_, args):
843
  """Initial checks whether to run or exit with a failure.
844

    
845
  """
846
  if args: # noded doesn't take any arguments
847
    print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
848
                          sys.argv[0])
849
    sys.exit(constants.EXIT_FAILURE)
850

    
851

    
852
def ExecNoded(options, _):
853
  """Main node daemon function, executed with the PID file held.
854

    
855
  """
856
  # Read SSL certificate
857
  if options.ssl:
858
    ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
859
                                    ssl_cert_path=options.ssl_cert)
860
  else:
861
    ssl_params = None
862

    
863
  err = _PrepareQueueLock()
864
  if err is not None:
865
    # this might be some kind of file-system/permission error; while
866
    # this breaks the job queue functionality, we shouldn't prevent
867
    # startup of the whole node daemon because of this
868
    logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
869

    
870
  mainloop = daemon.Mainloop()
871
  server = NodeHttpServer(mainloop, options.bind_address, options.port,
872
                          ssl_params=ssl_params, ssl_verify_peer=True)
873
  server.Start()
874
  try:
875
    mainloop.Run()
876
  finally:
877
    server.Stop()
878

    
879

    
880
def main():
881
  """Main function for the node daemon.
882

    
883
  """
884
  parser = OptionParser(description="Ganeti node daemon",
885
                        usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
886
                        version="%%prog (ganeti) %s" %
887
                        constants.RELEASE_VERSION)
888
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
889
  dirs.append((constants.LOG_OS_DIR, 0750))
890
  dirs.append((constants.LOCK_DIR, 1777))
891
  dirs.append((constants.CRYPTO_KEYS_DIR, constants.CRYPTO_KEYS_DIR_MODE))
892
  daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
893
                     default_ssl_cert=constants.NODED_CERT_FILE,
894
                     default_ssl_key=constants.NODED_CERT_FILE)
895

    
896

    
897
if __name__ == '__main__':
898
  main()