Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ ab221ddf

History | View | Annotate | Download (22.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# pylint: disable-msg=C0103,W0142
25

    
26
# C0103: Functions in this module need to have a given name structure,
27
# and the name of the daemon doesn't match
28

    
29
# W0142: Used * or ** magic, since we do use it extensively in this
30
# module
31

    
32
import os
33
import sys
34
import logging
35
import signal
36

    
37
from optparse import OptionParser
38

    
39
from ganeti import backend
40
from ganeti import constants
41
from ganeti import objects
42
from ganeti import errors
43
from ganeti import jstore
44
from ganeti import daemon
45
from ganeti import http
46
from ganeti import utils
47
from ganeti import storage
48
from ganeti import serializer
49

    
50
import ganeti.http.server # pylint: disable-msg=W0611
51

    
52

    
53
queue_lock = None
54

    
55

    
56
def _RequireJobQueueLock(fn):
57
  """Decorator for job queue manipulating functions.
58

    
59
  """
60
  QUEUE_LOCK_TIMEOUT = 10
61

    
62
  def wrapper(*args, **kwargs):
63
    # Locking in exclusive, blocking mode because there could be several
64
    # children running at the same time. Waiting up to 10 seconds.
65
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
66
    try:
67
      return fn(*args, **kwargs)
68
    finally:
69
      queue_lock.Unlock()
70

    
71
  return wrapper
72

    
73

    
74
class NodeHttpServer(http.server.HttpServer):
75
  """The server implementation.
76

    
77
  This class holds all methods exposed over the RPC interface.
78

    
79
  """
80
  # too many public methods, and unused args - all methods get params
81
  # due to the API
82
  # pylint: disable-msg=R0904,W0613
83
  def __init__(self, *args, **kwargs):
84
    http.server.HttpServer.__init__(self, *args, **kwargs)
85
    self.noded_pid = os.getpid()
86

    
87
  def HandleRequest(self, req):
88
    """Handle a request.
89

    
90
    """
91
    if req.request_method.upper() != http.HTTP_PUT:
92
      raise http.HttpBadRequest()
93

    
94
    path = req.request_path
95
    if path.startswith("/"):
96
      path = path[1:]
97

    
98
    method = getattr(self, "perspective_%s" % path, None)
99
    if method is None:
100
      raise http.HttpNotFound()
101

    
102
    try:
103
      result = (True, method(serializer.LoadJson(req.request_body)))
104

    
105
    except backend.RPCFail, err:
106
      # our custom failure exception; str(err) works fine if the
107
      # exception was constructed with a single argument, and in
108
      # this case, err.message == err.args[0] == str(err)
109
      result = (False, str(err))
110
    except errors.QuitGanetiException, err:
111
      # Tell parent to quit
112
      logging.info("Shutting down the node daemon, arguments: %s",
113
                   str(err.args))
114
      os.kill(self.noded_pid, signal.SIGTERM)
115
      # And return the error's arguments, which must be already in
116
      # correct tuple format
117
      result = err.args
118
    except Exception, err:
119
      logging.exception("Error in RPC call")
120
      result = (False, "Error while executing backend function: %s" % str(err))
121

    
122
    return serializer.DumpJson(result, indent=False)
123

    
124
  # the new block devices  --------------------------
125

    
126
  @staticmethod
127
  def perspective_blockdev_create(params):
128
    """Create a block device.
129

    
130
    """
131
    bdev_s, size, owner, on_primary, info = params
132
    bdev = objects.Disk.FromDict(bdev_s)
133
    if bdev is None:
134
      raise ValueError("can't unserialize data!")
135
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
136

    
137
  @staticmethod
138
  def perspective_blockdev_remove(params):
139
    """Remove a block device.
140

    
141
    """
142
    bdev_s = params[0]
143
    bdev = objects.Disk.FromDict(bdev_s)
144
    return backend.BlockdevRemove(bdev)
145

    
146
  @staticmethod
147
  def perspective_blockdev_rename(params):
148
    """Remove a block device.
149

    
150
    """
151
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
152
    return backend.BlockdevRename(devlist)
153

    
154
  @staticmethod
155
  def perspective_blockdev_assemble(params):
156
    """Assemble a block device.
157

    
158
    """
159
    bdev_s, owner, on_primary = params
160
    bdev = objects.Disk.FromDict(bdev_s)
161
    if bdev is None:
162
      raise ValueError("can't unserialize data!")
163
    return backend.BlockdevAssemble(bdev, owner, on_primary)
164

    
165
  @staticmethod
166
  def perspective_blockdev_shutdown(params):
167
    """Shutdown a block device.
168

    
169
    """
170
    bdev_s = params[0]
171
    bdev = objects.Disk.FromDict(bdev_s)
172
    if bdev is None:
173
      raise ValueError("can't unserialize data!")
174
    return backend.BlockdevShutdown(bdev)
175

    
176
  @staticmethod
177
  def perspective_blockdev_addchildren(params):
178
    """Add a child to a mirror device.
179

    
180
    Note: this is only valid for mirror devices. It's the caller's duty
181
    to send a correct disk, otherwise we raise an error.
182

    
183
    """
184
    bdev_s, ndev_s = params
185
    bdev = objects.Disk.FromDict(bdev_s)
186
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
187
    if bdev is None or ndevs.count(None) > 0:
188
      raise ValueError("can't unserialize data!")
189
    return backend.BlockdevAddchildren(bdev, ndevs)
190

    
191
  @staticmethod
192
  def perspective_blockdev_removechildren(params):
193
    """Remove a child from a mirror device.
194

    
195
    This is only valid for mirror devices, of course. It's the callers
196
    duty to send a correct disk, otherwise we raise an error.
197

    
198
    """
199
    bdev_s, ndev_s = params
200
    bdev = objects.Disk.FromDict(bdev_s)
201
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
202
    if bdev is None or ndevs.count(None) > 0:
203
      raise ValueError("can't unserialize data!")
204
    return backend.BlockdevRemovechildren(bdev, ndevs)
205

    
206
  @staticmethod
207
  def perspective_blockdev_getmirrorstatus(params):
208
    """Return the mirror status for a list of disks.
209

    
210
    """
211
    disks = [objects.Disk.FromDict(dsk_s)
212
             for dsk_s in params]
213
    return [status.ToDict()
214
            for status in backend.BlockdevGetmirrorstatus(disks)]
215

    
216
  @staticmethod
217
  def perspective_blockdev_find(params):
218
    """Expose the FindBlockDevice functionality for a disk.
219

    
220
    This will try to find but not activate a disk.
221

    
222
    """
223
    disk = objects.Disk.FromDict(params[0])
224

    
225
    result = backend.BlockdevFind(disk)
226
    if result is None:
227
      return None
228

    
229
    return result.ToDict()
230

    
231
  @staticmethod
232
  def perspective_blockdev_snapshot(params):
233
    """Create a snapshot device.
234

    
235
    Note that this is only valid for LVM disks, if we get passed
236
    something else we raise an exception. The snapshot device can be
237
    remove by calling the generic block device remove call.
238

    
239
    """
240
    cfbd = objects.Disk.FromDict(params[0])
241
    return backend.BlockdevSnapshot(cfbd)
242

    
243
  @staticmethod
244
  def perspective_blockdev_grow(params):
245
    """Grow a stack of devices.
246

    
247
    """
248
    cfbd = objects.Disk.FromDict(params[0])
249
    amount = params[1]
250
    return backend.BlockdevGrow(cfbd, amount)
251

    
252
  @staticmethod
253
  def perspective_blockdev_close(params):
254
    """Closes the given block devices.
255

    
256
    """
257
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
258
    return backend.BlockdevClose(params[0], disks)
259

    
260
  @staticmethod
261
  def perspective_blockdev_getsize(params):
262
    """Compute the sizes of the given block devices.
263

    
264
    """
265
    disks = [objects.Disk.FromDict(cf) for cf in params[0]]
266
    return backend.BlockdevGetsize(disks)
267

    
268
  @staticmethod
269
  def perspective_blockdev_export(params):
270
    """Compute the sizes of the given block devices.
271

    
272
    """
273
    disk = objects.Disk.FromDict(params[0])
274
    dest_node, dest_path, cluster_name = params[1:]
275
    return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
276

    
277
  # blockdev/drbd specific methods ----------
278

    
279
  @staticmethod
280
  def perspective_drbd_disconnect_net(params):
281
    """Disconnects the network connection of drbd disks.
282

    
283
    Note that this is only valid for drbd disks, so the members of the
284
    disk list must all be drbd devices.
285

    
286
    """
287
    nodes_ip, disks = params
288
    disks = [objects.Disk.FromDict(cf) for cf in disks]
289
    return backend.DrbdDisconnectNet(nodes_ip, disks)
290

    
291
  @staticmethod
292
  def perspective_drbd_attach_net(params):
293
    """Attaches the network connection of drbd disks.
294

    
295
    Note that this is only valid for drbd disks, so the members of the
296
    disk list must all be drbd devices.
297

    
298
    """
299
    nodes_ip, disks, instance_name, multimaster = params
300
    disks = [objects.Disk.FromDict(cf) for cf in disks]
301
    return backend.DrbdAttachNet(nodes_ip, disks,
302
                                     instance_name, multimaster)
303

    
304
  @staticmethod
305
  def perspective_drbd_wait_sync(params):
306
    """Wait until DRBD disks are synched.
307

    
308
    Note that this is only valid for drbd disks, so the members of the
309
    disk list must all be drbd devices.
310

    
311
    """
312
    nodes_ip, disks = params
313
    disks = [objects.Disk.FromDict(cf) for cf in disks]
314
    return backend.DrbdWaitSync(nodes_ip, disks)
315

    
316
  # export/import  --------------------------
317

    
318
  @staticmethod
319
  def perspective_snapshot_export(params):
320
    """Export a given snapshot.
321

    
322
    """
323
    disk = objects.Disk.FromDict(params[0])
324
    dest_node = params[1]
325
    instance = objects.Instance.FromDict(params[2])
326
    cluster_name = params[3]
327
    dev_idx = params[4]
328
    return backend.ExportSnapshot(disk, dest_node, instance,
329
                                  cluster_name, dev_idx)
330

    
331
  @staticmethod
332
  def perspective_finalize_export(params):
333
    """Expose the finalize export functionality.
334

    
335
    """
336
    instance = objects.Instance.FromDict(params[0])
337
    snap_disks = [objects.Disk.FromDict(str_data)
338
                  for str_data in params[1]]
339
    return backend.FinalizeExport(instance, snap_disks)
340

    
341
  @staticmethod
342
  def perspective_export_info(params):
343
    """Query information about an existing export on this node.
344

    
345
    The given path may not contain an export, in which case we return
346
    None.
347

    
348
    """
349
    path = params[0]
350
    return backend.ExportInfo(path)
351

    
352
  @staticmethod
353
  def perspective_export_list(params):
354
    """List the available exports on this node.
355

    
356
    Note that as opposed to export_info, which may query data about an
357
    export in any path, this only queries the standard Ganeti path
358
    (constants.EXPORT_DIR).
359

    
360
    """
361
    return backend.ListExports()
362

    
363
  @staticmethod
364
  def perspective_export_remove(params):
365
    """Remove an export.
366

    
367
    """
368
    export = params[0]
369
    return backend.RemoveExport(export)
370

    
371
  # volume  --------------------------
372

    
373
  @staticmethod
374
  def perspective_lv_list(params):
375
    """Query the list of logical volumes in a given volume group.
376

    
377
    """
378
    vgname = params[0]
379
    return backend.GetVolumeList(vgname)
380

    
381
  @staticmethod
382
  def perspective_vg_list(params):
383
    """Query the list of volume groups.
384

    
385
    """
386
    return backend.ListVolumeGroups()
387

    
388
  # Storage --------------------------
389

    
390
  @staticmethod
391
  def perspective_storage_list(params):
392
    """Get list of storage units.
393

    
394
    """
395
    (su_name, su_args, name, fields) = params
396
    return storage.GetStorage(su_name, *su_args).List(name, fields)
397

    
398
  @staticmethod
399
  def perspective_storage_modify(params):
400
    """Modify a storage unit.
401

    
402
    """
403
    (su_name, su_args, name, changes) = params
404
    return storage.GetStorage(su_name, *su_args).Modify(name, changes)
405

    
406
  @staticmethod
407
  def perspective_storage_execute(params):
408
    """Execute an operation on a storage unit.
409

    
410
    """
411
    (su_name, su_args, name, op) = params
412
    return storage.GetStorage(su_name, *su_args).Execute(name, op)
413

    
414
  # bridge  --------------------------
415

    
416
  @staticmethod
417
  def perspective_bridges_exist(params):
418
    """Check if all bridges given exist on this node.
419

    
420
    """
421
    bridges_list = params[0]
422
    return backend.BridgesExist(bridges_list)
423

    
424
  # instance  --------------------------
425

    
426
  @staticmethod
427
  def perspective_instance_os_add(params):
428
    """Install an OS on a given instance.
429

    
430
    """
431
    inst_s = params[0]
432
    inst = objects.Instance.FromDict(inst_s)
433
    reinstall = params[1]
434
    return backend.InstanceOsAdd(inst, reinstall)
435

    
436
  @staticmethod
437
  def perspective_instance_run_rename(params):
438
    """Runs the OS rename script for an instance.
439

    
440
    """
441
    inst_s, old_name = params
442
    inst = objects.Instance.FromDict(inst_s)
443
    return backend.RunRenameInstance(inst, old_name)
444

    
445
  @staticmethod
446
  def perspective_instance_os_import(params):
447
    """Run the import function of an OS onto a given instance.
448

    
449
    """
450
    inst_s, src_node, src_images, cluster_name = params
451
    inst = objects.Instance.FromDict(inst_s)
452
    return backend.ImportOSIntoInstance(inst, src_node, src_images,
453
                                        cluster_name)
454

    
455
  @staticmethod
456
  def perspective_instance_shutdown(params):
457
    """Shutdown an instance.
458

    
459
    """
460
    instance = objects.Instance.FromDict(params[0])
461
    timeout = params[1]
462
    return backend.InstanceShutdown(instance, timeout)
463

    
464
  @staticmethod
465
  def perspective_instance_start(params):
466
    """Start an instance.
467

    
468
    """
469
    instance = objects.Instance.FromDict(params[0])
470
    return backend.StartInstance(instance)
471

    
472
  @staticmethod
473
  def perspective_migration_info(params):
474
    """Gather information about an instance to be migrated.
475

    
476
    """
477
    instance = objects.Instance.FromDict(params[0])
478
    return backend.MigrationInfo(instance)
479

    
480
  @staticmethod
481
  def perspective_accept_instance(params):
482
    """Prepare the node to accept an instance.
483

    
484
    """
485
    instance, info, target = params
486
    instance = objects.Instance.FromDict(instance)
487
    return backend.AcceptInstance(instance, info, target)
488

    
489
  @staticmethod
490
  def perspective_finalize_migration(params):
491
    """Finalize the instance migration.
492

    
493
    """
494
    instance, info, success = params
495
    instance = objects.Instance.FromDict(instance)
496
    return backend.FinalizeMigration(instance, info, success)
497

    
498
  @staticmethod
499
  def perspective_instance_migrate(params):
500
    """Migrates an instance.
501

    
502
    """
503
    instance, target, live = params
504
    instance = objects.Instance.FromDict(instance)
505
    return backend.MigrateInstance(instance, target, live)
506

    
507
  @staticmethod
508
  def perspective_instance_reboot(params):
509
    """Reboot an instance.
510

    
511
    """
512
    instance = objects.Instance.FromDict(params[0])
513
    reboot_type = params[1]
514
    shutdown_timeout = params[2]
515
    return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
516

    
517
  @staticmethod
518
  def perspective_instance_info(params):
519
    """Query instance information.
520

    
521
    """
522
    return backend.GetInstanceInfo(params[0], params[1])
523

    
524
  @staticmethod
525
  def perspective_instance_migratable(params):
526
    """Query whether the specified instance can be migrated.
527

    
528
    """
529
    instance = objects.Instance.FromDict(params[0])
530
    return backend.GetInstanceMigratable(instance)
531

    
532
  @staticmethod
533
  def perspective_all_instances_info(params):
534
    """Query information about all instances.
535

    
536
    """
537
    return backend.GetAllInstancesInfo(params[0])
538

    
539
  @staticmethod
540
  def perspective_instance_list(params):
541
    """Query the list of running instances.
542

    
543
    """
544
    return backend.GetInstanceList(params[0])
545

    
546
  # node --------------------------
547

    
548
  @staticmethod
549
  def perspective_node_tcp_ping(params):
550
    """Do a TcpPing on the remote node.
551

    
552
    """
553
    return utils.TcpPing(params[1], params[2], timeout=params[3],
554
                         live_port_needed=params[4], source=params[0])
555

    
556
  @staticmethod
557
  def perspective_node_has_ip_address(params):
558
    """Checks if a node has the given ip address.
559

    
560
    """
561
    return utils.OwnIpAddress(params[0])
562

    
563
  @staticmethod
564
  def perspective_node_info(params):
565
    """Query node information.
566

    
567
    """
568
    vgname, hypervisor_type = params
569
    return backend.GetNodeInfo(vgname, hypervisor_type)
570

    
571
  @staticmethod
572
  def perspective_node_add(params):
573
    """Complete the registration of this node in the cluster.
574

    
575
    """
576
    return backend.AddNode(params[0], params[1], params[2],
577
                           params[3], params[4], params[5])
578

    
579
  @staticmethod
580
  def perspective_node_verify(params):
581
    """Run a verify sequence on this node.
582

    
583
    """
584
    return backend.VerifyNode(params[0], params[1])
585

    
586
  @staticmethod
587
  def perspective_node_start_master(params):
588
    """Promote this node to master status.
589

    
590
    """
591
    return backend.StartMaster(params[0], params[1])
592

    
593
  @staticmethod
594
  def perspective_node_stop_master(params):
595
    """Demote this node from master status.
596

    
597
    """
598
    return backend.StopMaster(params[0])
599

    
600
  @staticmethod
601
  def perspective_node_leave_cluster(params):
602
    """Cleanup after leaving a cluster.
603

    
604
    """
605
    return backend.LeaveCluster(params[0])
606

    
607
  @staticmethod
608
  def perspective_node_volumes(params):
609
    """Query the list of all logical volume groups.
610

    
611
    """
612
    return backend.NodeVolumes()
613

    
614
  @staticmethod
615
  def perspective_node_demote_from_mc(params):
616
    """Demote a node from the master candidate role.
617

    
618
    """
619
    return backend.DemoteFromMC()
620

    
621

    
622
  @staticmethod
623
  def perspective_node_powercycle(params):
624
    """Tries to powercycle the nod.
625

    
626
    """
627
    hypervisor_type = params[0]
628
    return backend.PowercycleNode(hypervisor_type)
629

    
630

    
631
  # cluster --------------------------
632

    
633
  @staticmethod
634
  def perspective_version(params):
635
    """Query version information.
636

    
637
    """
638
    return constants.PROTOCOL_VERSION
639

    
640
  @staticmethod
641
  def perspective_upload_file(params):
642
    """Upload a file.
643

    
644
    Note that the backend implementation imposes strict rules on which
645
    files are accepted.
646

    
647
    """
648
    return backend.UploadFile(*params)
649

    
650
  @staticmethod
651
  def perspective_master_info(params):
652
    """Query master information.
653

    
654
    """
655
    return backend.GetMasterInfo()
656

    
657
  @staticmethod
658
  def perspective_write_ssconf_files(params):
659
    """Write ssconf files.
660

    
661
    """
662
    (values,) = params
663
    return backend.WriteSsconfFiles(values)
664

    
665
  # os -----------------------
666

    
667
  @staticmethod
668
  def perspective_os_diagnose(params):
669
    """Query detailed information about existing OSes.
670

    
671
    """
672
    return backend.DiagnoseOS()
673

    
674
  @staticmethod
675
  def perspective_os_get(params):
676
    """Query information about a given OS.
677

    
678
    """
679
    name = params[0]
680
    os_obj = backend.OSFromDisk(name)
681
    return os_obj.ToDict()
682

    
683
  # hooks -----------------------
684

    
685
  @staticmethod
686
  def perspective_hooks_runner(params):
687
    """Run hook scripts.
688

    
689
    """
690
    hpath, phase, env = params
691
    hr = backend.HooksRunner()
692
    return hr.RunHooks(hpath, phase, env)
693

    
694
  # iallocator -----------------
695

    
696
  @staticmethod
697
  def perspective_iallocator_runner(params):
698
    """Run an iallocator script.
699

    
700
    """
701
    name, idata = params
702
    iar = backend.IAllocatorRunner()
703
    return iar.Run(name, idata)
704

    
705
  # test -----------------------
706

    
707
  @staticmethod
708
  def perspective_test_delay(params):
709
    """Run test delay.
710

    
711
    """
712
    duration = params[0]
713
    status, rval = utils.TestDelay(duration)
714
    if not status:
715
      raise backend.RPCFail(rval)
716
    return rval
717

    
718
  # file storage ---------------
719

    
720
  @staticmethod
721
  def perspective_file_storage_dir_create(params):
722
    """Create the file storage directory.
723

    
724
    """
725
    file_storage_dir = params[0]
726
    return backend.CreateFileStorageDir(file_storage_dir)
727

    
728
  @staticmethod
729
  def perspective_file_storage_dir_remove(params):
730
    """Remove the file storage directory.
731

    
732
    """
733
    file_storage_dir = params[0]
734
    return backend.RemoveFileStorageDir(file_storage_dir)
735

    
736
  @staticmethod
737
  def perspective_file_storage_dir_rename(params):
738
    """Rename the file storage directory.
739

    
740
    """
741
    old_file_storage_dir = params[0]
742
    new_file_storage_dir = params[1]
743
    return backend.RenameFileStorageDir(old_file_storage_dir,
744
                                        new_file_storage_dir)
745

    
746
  # jobs ------------------------
747

    
748
  @staticmethod
749
  @_RequireJobQueueLock
750
  def perspective_jobqueue_update(params):
751
    """Update job queue.
752

    
753
    """
754
    (file_name, content) = params
755
    return backend.JobQueueUpdate(file_name, content)
756

    
757
  @staticmethod
758
  @_RequireJobQueueLock
759
  def perspective_jobqueue_purge(params):
760
    """Purge job queue.
761

    
762
    """
763
    return backend.JobQueuePurge()
764

    
765
  @staticmethod
766
  @_RequireJobQueueLock
767
  def perspective_jobqueue_rename(params):
768
    """Rename a job queue file.
769

    
770
    """
771
    # TODO: What if a file fails to rename?
772
    return [backend.JobQueueRename(old, new) for old, new in params]
773

    
774
  @staticmethod
775
  def perspective_jobqueue_set_drain(params):
776
    """Set/unset the queue drain flag.
777

    
778
    """
779
    drain_flag = params[0]
780
    return backend.JobQueueSetDrainFlag(drain_flag)
781

    
782

    
783
  # hypervisor ---------------
784

    
785
  @staticmethod
786
  def perspective_hypervisor_validate_params(params):
787
    """Validate the hypervisor parameters.
788

    
789
    """
790
    (hvname, hvparams) = params
791
    return backend.ValidateHVParams(hvname, hvparams)
792

    
793

    
794
def CheckNoded(_, args):
795
  """Initial checks whether to run or exit with a failure.
796

    
797
  """
798
  if args: # noded doesn't take any arguments
799
    print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
800
                          sys.argv[0])
801
    sys.exit(constants.EXIT_FAILURE)
802

    
803

    
804
def ExecNoded(options, _):
805
  """Main node daemon function, executed with the PID file held.
806

    
807
  """
808
  global queue_lock # pylint: disable-msg=W0603
809

    
810
  # Read SSL certificate
811
  if options.ssl:
812
    ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
813
                                    ssl_cert_path=options.ssl_cert)
814
  else:
815
    ssl_params = None
816

    
817
  # Prepare job queue
818
  queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
819

    
820
  mainloop = daemon.Mainloop()
821
  server = NodeHttpServer(mainloop, options.bind_address, options.port,
822
                          ssl_params=ssl_params, ssl_verify_peer=True)
823
  server.Start()
824
  try:
825
    mainloop.Run()
826
  finally:
827
    server.Stop()
828

    
829

    
830
def main():
831
  """Main function for the node daemon.
832

    
833
  """
834
  parser = OptionParser(description="Ganeti node daemon",
835
                        usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
836
                        version="%%prog (ganeti) %s" %
837
                        constants.RELEASE_VERSION)
838
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
839
  dirs.append((constants.LOG_OS_DIR, 0750))
840
  dirs.append((constants.LOCK_DIR, 1777))
841
  daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
842
                     default_ssl_cert=constants.SSL_CERT_FILE,
843
                     default_ssl_key=constants.SSL_CERT_FILE)
844

    
845

    
846
if __name__ == '__main__':
847
  main()