Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 2d54e29c

History | View | Annotate | Download (21.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# pylint: disable-msg=C0103,W0142
25

    
26
# C0103: Functions in this module need to have a given name structure,
27
# and the name of the daemon doesn't match
28

    
29
# W0142: Used * or ** magic, since we do use it extensively in this
30
# module
31

    
32
import os
33
import sys
34
import SocketServer
35
import logging
36
import signal
37

    
38
from optparse import OptionParser
39

    
40
from ganeti import backend
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import errors
44
from ganeti import jstore
45
from ganeti import daemon
46
from ganeti import http
47
from ganeti import utils
48
from ganeti import storage
49

    
50
import ganeti.http.server
51

    
52

    
53
queue_lock = None
54

    
55

    
56
def _RequireJobQueueLock(fn):
57
  """Decorator for job queue manipulating functions.
58

    
59
  """
60
  QUEUE_LOCK_TIMEOUT = 10
61

    
62
  def wrapper(*args, **kwargs):
63
    # Locking in exclusive, blocking mode because there could be several
64
    # children running at the same time. Waiting up to 10 seconds.
65
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
66
    try:
67
      return fn(*args, **kwargs)
68
    finally:
69
      queue_lock.Unlock()
70

    
71
  return wrapper
72

    
73

    
74
class NodeHttpServer(http.server.HttpServer):
75
  """The server implementation.
76

    
77
  This class holds all methods exposed over the RPC interface.
78

    
79
  """
80
  # too many public methods, and unused args - all methods get params
81
  # due to the API
82
  # pylint: disable-msg=R0904,W0613
83
  def __init__(self, *args, **kwargs):
84
    http.server.HttpServer.__init__(self, *args, **kwargs)
85
    self.noded_pid = os.getpid()
86

    
87
  def HandleRequest(self, req):
88
    """Handle a request.
89

    
90
    """
91
    if req.request_method.upper() != http.HTTP_PUT:
92
      raise http.HttpBadRequest()
93

    
94
    path = req.request_path
95
    if path.startswith("/"):
96
      path = path[1:]
97

    
98
    method = getattr(self, "perspective_%s" % path, None)
99
    if method is None:
100
      raise http.HttpNotFound()
101

    
102
    try:
103
      rvalue = method(req.request_body)
104
      return True, rvalue
105

    
106
    except backend.RPCFail, err:
107
      # our custom failure exception; str(err) works fine if the
108
      # exception was constructed with a single argument, and in
109
      # this case, err.message == err.args[0] == str(err)
110
      return (False, str(err))
111
    except errors.QuitGanetiException, err:
112
      # Tell parent to quit
113
      logging.info("Shutting down the node daemon, arguments: %s",
114
                   str(err.args))
115
      os.kill(self.noded_pid, signal.SIGTERM)
116
      # And return the error's arguments, which must be already in
117
      # correct tuple format
118
      return err.args
119
    except Exception, err:
120
      logging.exception("Error in RPC call")
121
      return False, "Error while executing backend function: %s" % str(err)
122

    
123
  # the new block devices  --------------------------
124

    
125
  @staticmethod
126
  def perspective_blockdev_create(params):
127
    """Create a block device.
128

    
129
    """
130
    bdev_s, size, owner, on_primary, info = params
131
    bdev = objects.Disk.FromDict(bdev_s)
132
    if bdev is None:
133
      raise ValueError("can't unserialize data!")
134
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
135

    
136
  @staticmethod
137
  def perspective_blockdev_remove(params):
138
    """Remove a block device.
139

    
140
    """
141
    bdev_s = params[0]
142
    bdev = objects.Disk.FromDict(bdev_s)
143
    return backend.BlockdevRemove(bdev)
144

    
145
  @staticmethod
146
  def perspective_blockdev_rename(params):
147
    """Remove a block device.
148

    
149
    """
150
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
151
    return backend.BlockdevRename(devlist)
152

    
153
  @staticmethod
154
  def perspective_blockdev_assemble(params):
155
    """Assemble a block device.
156

    
157
    """
158
    bdev_s, owner, on_primary = params
159
    bdev = objects.Disk.FromDict(bdev_s)
160
    if bdev is None:
161
      raise ValueError("can't unserialize data!")
162
    return backend.BlockdevAssemble(bdev, owner, on_primary)
163

    
164
  @staticmethod
165
  def perspective_blockdev_shutdown(params):
166
    """Shutdown a block device.
167

    
168
    """
169
    bdev_s = params[0]
170
    bdev = objects.Disk.FromDict(bdev_s)
171
    if bdev is None:
172
      raise ValueError("can't unserialize data!")
173
    return backend.BlockdevShutdown(bdev)
174

    
175
  @staticmethod
176
  def perspective_blockdev_addchildren(params):
177
    """Add a child to a mirror device.
178

    
179
    Note: this is only valid for mirror devices. It's the caller's duty
180
    to send a correct disk, otherwise we raise an error.
181

    
182
    """
183
    bdev_s, ndev_s = params
184
    bdev = objects.Disk.FromDict(bdev_s)
185
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
186
    if bdev is None or ndevs.count(None) > 0:
187
      raise ValueError("can't unserialize data!")
188
    return backend.BlockdevAddchildren(bdev, ndevs)
189

    
190
  @staticmethod
191
  def perspective_blockdev_removechildren(params):
192
    """Remove a child from a mirror device.
193

    
194
    This is only valid for mirror devices, of course. It's the callers
195
    duty to send a correct disk, otherwise we raise an error.
196

    
197
    """
198
    bdev_s, ndev_s = params
199
    bdev = objects.Disk.FromDict(bdev_s)
200
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
201
    if bdev is None or ndevs.count(None) > 0:
202
      raise ValueError("can't unserialize data!")
203
    return backend.BlockdevRemovechildren(bdev, ndevs)
204

    
205
  @staticmethod
206
  def perspective_blockdev_getmirrorstatus(params):
207
    """Return the mirror status for a list of disks.
208

    
209
    """
210
    disks = [objects.Disk.FromDict(dsk_s)
211
             for dsk_s in params]
212
    return [status.ToDict()
213
            for status in backend.BlockdevGetmirrorstatus(disks)]
214

    
215
  @staticmethod
216
  def perspective_blockdev_find(params):
217
    """Expose the FindBlockDevice functionality for a disk.
218

    
219
    This will try to find but not activate a disk.
220

    
221
    """
222
    disk = objects.Disk.FromDict(params[0])
223

    
224
    result = backend.BlockdevFind(disk)
225
    if result is None:
226
      return None
227

    
228
    return result.ToDict()
229

    
230
  @staticmethod
231
  def perspective_blockdev_snapshot(params):
232
    """Create a snapshot device.
233

    
234
    Note that this is only valid for LVM disks, if we get passed
235
    something else we raise an exception. The snapshot device can be
236
    remove by calling the generic block device remove call.
237

    
238
    """
239
    cfbd = objects.Disk.FromDict(params[0])
240
    return backend.BlockdevSnapshot(cfbd)
241

    
242
  @staticmethod
243
  def perspective_blockdev_grow(params):
244
    """Grow a stack of devices.
245

    
246
    """
247
    cfbd = objects.Disk.FromDict(params[0])
248
    amount = params[1]
249
    return backend.BlockdevGrow(cfbd, amount)
250

    
251
  @staticmethod
252
  def perspective_blockdev_close(params):
253
    """Closes the given block devices.
254

    
255
    """
256
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
257
    return backend.BlockdevClose(params[0], disks)
258

    
259
  @staticmethod
260
  def perspective_blockdev_getsize(params):
261
    """Compute the sizes of the given block devices.
262

    
263
    """
264
    disks = [objects.Disk.FromDict(cf) for cf in params[0]]
265
    return backend.BlockdevGetsize(disks)
266

    
267
  @staticmethod
268
  def perspective_blockdev_export(params):
269
    """Compute the sizes of the given block devices.
270

    
271
    """
272
    disk = objects.Disk.FromDict(params[0])
273
    dest_node, dest_path, cluster_name = params[1:]
274
    return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
275

    
276
  # blockdev/drbd specific methods ----------
277

    
278
  @staticmethod
279
  def perspective_drbd_disconnect_net(params):
280
    """Disconnects the network connection of drbd disks.
281

    
282
    Note that this is only valid for drbd disks, so the members of the
283
    disk list must all be drbd devices.
284

    
285
    """
286
    nodes_ip, disks = params
287
    disks = [objects.Disk.FromDict(cf) for cf in disks]
288
    return backend.DrbdDisconnectNet(nodes_ip, disks)
289

    
290
  @staticmethod
291
  def perspective_drbd_attach_net(params):
292
    """Attaches the network connection of drbd disks.
293

    
294
    Note that this is only valid for drbd disks, so the members of the
295
    disk list must all be drbd devices.
296

    
297
    """
298
    nodes_ip, disks, instance_name, multimaster = params
299
    disks = [objects.Disk.FromDict(cf) for cf in disks]
300
    return backend.DrbdAttachNet(nodes_ip, disks,
301
                                     instance_name, multimaster)
302

    
303
  @staticmethod
304
  def perspective_drbd_wait_sync(params):
305
    """Wait until DRBD disks are synched.
306

    
307
    Note that this is only valid for drbd disks, so the members of the
308
    disk list must all be drbd devices.
309

    
310
    """
311
    nodes_ip, disks = params
312
    disks = [objects.Disk.FromDict(cf) for cf in disks]
313
    return backend.DrbdWaitSync(nodes_ip, disks)
314

    
315
  # export/import  --------------------------
316

    
317
  @staticmethod
318
  def perspective_snapshot_export(params):
319
    """Export a given snapshot.
320

    
321
    """
322
    disk = objects.Disk.FromDict(params[0])
323
    dest_node = params[1]
324
    instance = objects.Instance.FromDict(params[2])
325
    cluster_name = params[3]
326
    dev_idx = params[4]
327
    return backend.ExportSnapshot(disk, dest_node, instance,
328
                                  cluster_name, dev_idx)
329

    
330
  @staticmethod
331
  def perspective_finalize_export(params):
332
    """Expose the finalize export functionality.
333

    
334
    """
335
    instance = objects.Instance.FromDict(params[0])
336
    snap_disks = [objects.Disk.FromDict(str_data)
337
                  for str_data in params[1]]
338
    return backend.FinalizeExport(instance, snap_disks)
339

    
340
  @staticmethod
341
  def perspective_export_info(params):
342
    """Query information about an existing export on this node.
343

    
344
    The given path may not contain an export, in which case we return
345
    None.
346

    
347
    """
348
    path = params[0]
349
    return backend.ExportInfo(path)
350

    
351
  @staticmethod
352
  def perspective_export_list(params):
353
    """List the available exports on this node.
354

    
355
    Note that as opposed to export_info, which may query data about an
356
    export in any path, this only queries the standard Ganeti path
357
    (constants.EXPORT_DIR).
358

    
359
    """
360
    return backend.ListExports()
361

    
362
  @staticmethod
363
  def perspective_export_remove(params):
364
    """Remove an export.
365

    
366
    """
367
    export = params[0]
368
    return backend.RemoveExport(export)
369

    
370
  # volume  --------------------------
371

    
372
  @staticmethod
373
  def perspective_lv_list(params):
374
    """Query the list of logical volumes in a given volume group.
375

    
376
    """
377
    vgname = params[0]
378
    return backend.GetVolumeList(vgname)
379

    
380
  @staticmethod
381
  def perspective_vg_list(params):
382
    """Query the list of volume groups.
383

    
384
    """
385
    return backend.ListVolumeGroups()
386

    
387
  # Storage --------------------------
388

    
389
  @staticmethod
390
  def perspective_storage_list(params):
391
    """Get list of storage units.
392

    
393
    """
394
    (su_name, su_args, name, fields) = params
395
    return storage.GetStorage(su_name, *su_args).List(name, fields)
396

    
397
  @staticmethod
398
  def perspective_storage_modify(params):
399
    """Modify a storage unit.
400

    
401
    """
402
    (su_name, su_args, name, changes) = params
403
    return storage.GetStorage(su_name, *su_args).Modify(name, changes)
404

    
405
  @staticmethod
406
  def perspective_storage_execute(params):
407
    """Execute an operation on a storage unit.
408

    
409
    """
410
    (su_name, su_args, name, op) = params
411
    return storage.GetStorage(su_name, *su_args).Execute(name, op)
412

    
413
  # bridge  --------------------------
414

    
415
  @staticmethod
416
  def perspective_bridges_exist(params):
417
    """Check if all bridges given exist on this node.
418

    
419
    """
420
    bridges_list = params[0]
421
    return backend.BridgesExist(bridges_list)
422

    
423
  # instance  --------------------------
424

    
425
  @staticmethod
426
  def perspective_instance_os_add(params):
427
    """Install an OS on a given instance.
428

    
429
    """
430
    inst_s = params[0]
431
    inst = objects.Instance.FromDict(inst_s)
432
    reinstall = params[1]
433
    return backend.InstanceOsAdd(inst, reinstall)
434

    
435
  @staticmethod
436
  def perspective_instance_run_rename(params):
437
    """Runs the OS rename script for an instance.
438

    
439
    """
440
    inst_s, old_name = params
441
    inst = objects.Instance.FromDict(inst_s)
442
    return backend.RunRenameInstance(inst, old_name)
443

    
444
  @staticmethod
445
  def perspective_instance_os_import(params):
446
    """Run the import function of an OS onto a given instance.
447

    
448
    """
449
    inst_s, src_node, src_images, cluster_name = params
450
    inst = objects.Instance.FromDict(inst_s)
451
    return backend.ImportOSIntoInstance(inst, src_node, src_images,
452
                                        cluster_name)
453

    
454
  @staticmethod
455
  def perspective_instance_shutdown(params):
456
    """Shutdown an instance.
457

    
458
    """
459
    instance = objects.Instance.FromDict(params[0])
460
    timeout = params[1]
461
    return backend.InstanceShutdown(instance, timeout)
462

    
463
  @staticmethod
464
  def perspective_instance_start(params):
465
    """Start an instance.
466

    
467
    """
468
    instance = objects.Instance.FromDict(params[0])
469
    return backend.StartInstance(instance)
470

    
471
  @staticmethod
472
  def perspective_migration_info(params):
473
    """Gather information about an instance to be migrated.
474

    
475
    """
476
    instance = objects.Instance.FromDict(params[0])
477
    return backend.MigrationInfo(instance)
478

    
479
  @staticmethod
480
  def perspective_accept_instance(params):
481
    """Prepare the node to accept an instance.
482

    
483
    """
484
    instance, info, target = params
485
    instance = objects.Instance.FromDict(instance)
486
    return backend.AcceptInstance(instance, info, target)
487

    
488
  @staticmethod
489
  def perspective_finalize_migration(params):
490
    """Finalize the instance migration.
491

    
492
    """
493
    instance, info, success = params
494
    instance = objects.Instance.FromDict(instance)
495
    return backend.FinalizeMigration(instance, info, success)
496

    
497
  @staticmethod
498
  def perspective_instance_migrate(params):
499
    """Migrates an instance.
500

    
501
    """
502
    instance, target, live = params
503
    instance = objects.Instance.FromDict(instance)
504
    return backend.MigrateInstance(instance, target, live)
505

    
506
  @staticmethod
507
  def perspective_instance_reboot(params):
508
    """Reboot an instance.
509

    
510
    """
511
    instance = objects.Instance.FromDict(params[0])
512
    reboot_type = params[1]
513
    shutdown_timeout = params[2]
514
    return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
515

    
516
  @staticmethod
517
  def perspective_instance_info(params):
518
    """Query instance information.
519

    
520
    """
521
    return backend.GetInstanceInfo(params[0], params[1])
522

    
523
  @staticmethod
524
  def perspective_instance_migratable(params):
525
    """Query whether the specified instance can be migrated.
526

    
527
    """
528
    instance = objects.Instance.FromDict(params[0])
529
    return backend.GetInstanceMigratable(instance)
530

    
531
  @staticmethod
532
  def perspective_all_instances_info(params):
533
    """Query information about all instances.
534

    
535
    """
536
    return backend.GetAllInstancesInfo(params[0])
537

    
538
  @staticmethod
539
  def perspective_instance_list(params):
540
    """Query the list of running instances.
541

    
542
    """
543
    return backend.GetInstanceList(params[0])
544

    
545
  # node --------------------------
546

    
547
  @staticmethod
548
  def perspective_node_tcp_ping(params):
549
    """Do a TcpPing on the remote node.
550

    
551
    """
552
    return utils.TcpPing(params[1], params[2], timeout=params[3],
553
                         live_port_needed=params[4], source=params[0])
554

    
555
  @staticmethod
556
  def perspective_node_has_ip_address(params):
557
    """Checks if a node has the given ip address.
558

    
559
    """
560
    return utils.OwnIpAddress(params[0])
561

    
562
  @staticmethod
563
  def perspective_node_info(params):
564
    """Query node information.
565

    
566
    """
567
    vgname, hypervisor_type = params
568
    return backend.GetNodeInfo(vgname, hypervisor_type)
569

    
570
  @staticmethod
571
  def perspective_node_add(params):
572
    """Complete the registration of this node in the cluster.
573

    
574
    """
575
    return backend.AddNode(params[0], params[1], params[2],
576
                           params[3], params[4], params[5])
577

    
578
  @staticmethod
579
  def perspective_node_verify(params):
580
    """Run a verify sequence on this node.
581

    
582
    """
583
    return backend.VerifyNode(params[0], params[1])
584

    
585
  @staticmethod
586
  def perspective_node_start_master(params):
587
    """Promote this node to master status.
588

    
589
    """
590
    return backend.StartMaster(params[0], params[1])
591

    
592
  @staticmethod
593
  def perspective_node_stop_master(params):
594
    """Demote this node from master status.
595

    
596
    """
597
    return backend.StopMaster(params[0])
598

    
599
  @staticmethod
600
  def perspective_node_leave_cluster(params):
601
    """Cleanup after leaving a cluster.
602

    
603
    """
604
    return backend.LeaveCluster(params[0])
605

    
606
  @staticmethod
607
  def perspective_node_volumes(params):
608
    """Query the list of all logical volume groups.
609

    
610
    """
611
    return backend.NodeVolumes()
612

    
613
  @staticmethod
614
  def perspective_node_demote_from_mc(params):
615
    """Demote a node from the master candidate role.
616

    
617
    """
618
    return backend.DemoteFromMC()
619

    
620

    
621
  @staticmethod
622
  def perspective_node_powercycle(params):
623
    """Tries to powercycle the nod.
624

    
625
    """
626
    hypervisor_type = params[0]
627
    return backend.PowercycleNode(hypervisor_type)
628

    
629

    
630
  # cluster --------------------------
631

    
632
  @staticmethod
633
  def perspective_version(params):
634
    """Query version information.
635

    
636
    """
637
    return constants.PROTOCOL_VERSION
638

    
639
  @staticmethod
640
  def perspective_upload_file(params):
641
    """Upload a file.
642

    
643
    Note that the backend implementation imposes strict rules on which
644
    files are accepted.
645

    
646
    """
647
    return backend.UploadFile(*params)
648

    
649
  @staticmethod
650
  def perspective_master_info(params):
651
    """Query master information.
652

    
653
    """
654
    return backend.GetMasterInfo()
655

    
656
  @staticmethod
657
  def perspective_write_ssconf_files(params):
658
    """Write ssconf files.
659

    
660
    """
661
    (values,) = params
662
    return backend.WriteSsconfFiles(values)
663

    
664
  # os -----------------------
665

    
666
  @staticmethod
667
  def perspective_os_diagnose(params):
668
    """Query detailed information about existing OSes.
669

    
670
    """
671
    return backend.DiagnoseOS()
672

    
673
  @staticmethod
674
  def perspective_os_get(params):
675
    """Query information about a given OS.
676

    
677
    """
678
    name = params[0]
679
    os_obj = backend.OSFromDisk(name)
680
    return os_obj.ToDict()
681

    
682
  # hooks -----------------------
683

    
684
  @staticmethod
685
  def perspective_hooks_runner(params):
686
    """Run hook scripts.
687

    
688
    """
689
    hpath, phase, env = params
690
    hr = backend.HooksRunner()
691
    return hr.RunHooks(hpath, phase, env)
692

    
693
  # iallocator -----------------
694

    
695
  @staticmethod
696
  def perspective_iallocator_runner(params):
697
    """Run an iallocator script.
698

    
699
    """
700
    name, idata = params
701
    iar = backend.IAllocatorRunner()
702
    return iar.Run(name, idata)
703

    
704
  # test -----------------------
705

    
706
  @staticmethod
707
  def perspective_test_delay(params):
708
    """Run test delay.
709

    
710
    """
711
    duration = params[0]
712
    status, rval = utils.TestDelay(duration)
713
    if not status:
714
      raise backend.RPCFail(rval)
715
    return rval
716

    
717
  # file storage ---------------
718

    
719
  @staticmethod
720
  def perspective_file_storage_dir_create(params):
721
    """Create the file storage directory.
722

    
723
    """
724
    file_storage_dir = params[0]
725
    return backend.CreateFileStorageDir(file_storage_dir)
726

    
727
  @staticmethod
728
  def perspective_file_storage_dir_remove(params):
729
    """Remove the file storage directory.
730

    
731
    """
732
    file_storage_dir = params[0]
733
    return backend.RemoveFileStorageDir(file_storage_dir)
734

    
735
  @staticmethod
736
  def perspective_file_storage_dir_rename(params):
737
    """Rename the file storage directory.
738

    
739
    """
740
    old_file_storage_dir = params[0]
741
    new_file_storage_dir = params[1]
742
    return backend.RenameFileStorageDir(old_file_storage_dir,
743
                                        new_file_storage_dir)
744

    
745
  # jobs ------------------------
746

    
747
  @staticmethod
748
  @_RequireJobQueueLock
749
  def perspective_jobqueue_update(params):
750
    """Update job queue.
751

    
752
    """
753
    (file_name, content) = params
754
    return backend.JobQueueUpdate(file_name, content)
755

    
756
  @staticmethod
757
  @_RequireJobQueueLock
758
  def perspective_jobqueue_purge(params):
759
    """Purge job queue.
760

    
761
    """
762
    return backend.JobQueuePurge()
763

    
764
  @staticmethod
765
  @_RequireJobQueueLock
766
  def perspective_jobqueue_rename(params):
767
    """Rename a job queue file.
768

    
769
    """
770
    # TODO: What if a file fails to rename?
771
    return [backend.JobQueueRename(old, new) for old, new in params]
772

    
773
  @staticmethod
774
  def perspective_jobqueue_set_drain(params):
775
    """Set/unset the queue drain flag.
776

    
777
    """
778
    drain_flag = params[0]
779
    return backend.JobQueueSetDrainFlag(drain_flag)
780

    
781

    
782
  # hypervisor ---------------
783

    
784
  @staticmethod
785
  def perspective_hypervisor_validate_params(params):
786
    """Validate the hypervisor parameters.
787

    
788
    """
789
    (hvname, hvparams) = params
790
    return backend.ValidateHVParams(hvname, hvparams)
791

    
792

    
793
def CheckNoded(_, args):
794
  """Initial checks whether to run or exit with a failure.
795

    
796
  """
797
  if args: # noded doesn't take any arguments
798
    print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
799
                          sys.argv[0])
800
    sys.exit(constants.EXIT_FAILURE)
801

    
802

    
803
def ExecNoded(options, _):
804
  """Main node daemon function, executed with the PID file held.
805

    
806
  """
807
  global queue_lock # pylint: disable-msg=W0603
808

    
809
  # Read SSL certificate
810
  if options.ssl:
811
    ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
812
                                    ssl_cert_path=options.ssl_cert)
813
  else:
814
    ssl_params = None
815

    
816
  # Prepare job queue
817
  queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
818

    
819
  mainloop = daemon.Mainloop()
820
  server = NodeHttpServer(mainloop, options.bind_address, options.port,
821
                          ssl_params=ssl_params, ssl_verify_peer=True)
822
  server.Start()
823
  try:
824
    mainloop.Run()
825
  finally:
826
    server.Stop()
827

    
828

    
829
def main():
830
  """Main function for the node daemon.
831

    
832
  """
833
  parser = OptionParser(description="Ganeti node daemon",
834
                        usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
835
                        version="%%prog (ganeti) %s" %
836
                        constants.RELEASE_VERSION)
837
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
838
  dirs.append((constants.LOG_OS_DIR, 0750))
839
  dirs.append((constants.LOCK_DIR, 1777))
840
  daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded)
841

    
842

    
843
if __name__ == '__main__':
844
  main()