Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 81198f6e

History | View | Annotate | Download (22.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# pylint: disable-msg=C0103,W0142
25

    
26
# C0103: Functions in this module need to have a given name structure,
27
# and the name of the daemon doesn't match
28

    
29
# W0142: Used * or ** magic, since we do use it extensively in this
30
# module
31

    
32
import os
33
import sys
34
import logging
35
import signal
36

    
37
from optparse import OptionParser
38

    
39
from ganeti import backend
40
from ganeti import constants
41
from ganeti import objects
42
from ganeti import errors
43
from ganeti import jstore
44
from ganeti import daemon
45
from ganeti import http
46
from ganeti import utils
47
from ganeti import storage
48

    
49
import ganeti.http.server # pylint: disable-msg=W0611
50

    
51

    
52
queue_lock = None
53

    
54

    
55
def _PrepareQueueLock():
56
  """Try to prepare the queue lock.
57

    
58
  @return: None for success, otherwise an exception object
59

    
60
  """
61
  global queue_lock # pylint: disable-msg=W0603
62

    
63
  if queue_lock is not None:
64
    return None
65

    
66
  # Prepare job queue
67
  try:
68
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
69
    return None
70
  except EnvironmentError, err:
71
    return err
72

    
73

    
74
def _RequireJobQueueLock(fn):
75
  """Decorator for job queue manipulating functions.
76

    
77
  """
78
  QUEUE_LOCK_TIMEOUT = 10
79

    
80
  def wrapper(*args, **kwargs):
81
    # Locking in exclusive, blocking mode because there could be several
82
    # children running at the same time. Waiting up to 10 seconds.
83
    if _PrepareQueueLock() is not None:
84
      raise errors.JobQueueError("Job queue failed initialization,"
85
                                 " cannot update jobs")
86
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
87
    try:
88
      return fn(*args, **kwargs)
89
    finally:
90
      queue_lock.Unlock()
91

    
92
  return wrapper
93

    
94

    
95
class NodeHttpServer(http.server.HttpServer):
96
  """The server implementation.
97

    
98
  This class holds all methods exposed over the RPC interface.
99

    
100
  """
101
  # too many public methods, and unused args - all methods get params
102
  # due to the API
103
  # pylint: disable-msg=R0904,W0613
104
  def __init__(self, *args, **kwargs):
105
    http.server.HttpServer.__init__(self, *args, **kwargs)
106
    self.noded_pid = os.getpid()
107

    
108
  def HandleRequest(self, req):
109
    """Handle a request.
110

    
111
    """
112
    if req.request_method.upper() != http.HTTP_PUT:
113
      raise http.HttpBadRequest()
114

    
115
    path = req.request_path
116
    if path.startswith("/"):
117
      path = path[1:]
118

    
119
    method = getattr(self, "perspective_%s" % path, None)
120
    if method is None:
121
      raise http.HttpNotFound()
122

    
123
    try:
124
      rvalue = method(req.request_body)
125
      return True, rvalue
126

    
127
    except backend.RPCFail, err:
128
      # our custom failure exception; str(err) works fine if the
129
      # exception was constructed with a single argument, and in
130
      # this case, err.message == err.args[0] == str(err)
131
      return (False, str(err))
132
    except errors.QuitGanetiException, err:
133
      # Tell parent to quit
134
      logging.info("Shutting down the node daemon, arguments: %s",
135
                   str(err.args))
136
      os.kill(self.noded_pid, signal.SIGTERM)
137
      # And return the error's arguments, which must be already in
138
      # correct tuple format
139
      return err.args
140
    except Exception, err:
141
      logging.exception("Error in RPC call")
142
      return False, "Error while executing backend function: %s" % str(err)
143

    
144
  # the new block devices  --------------------------
145

    
146
  @staticmethod
147
  def perspective_blockdev_create(params):
148
    """Create a block device.
149

    
150
    """
151
    bdev_s, size, owner, on_primary, info = params
152
    bdev = objects.Disk.FromDict(bdev_s)
153
    if bdev is None:
154
      raise ValueError("can't unserialize data!")
155
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
156

    
157
  @staticmethod
158
  def perspective_blockdev_remove(params):
159
    """Remove a block device.
160

    
161
    """
162
    bdev_s = params[0]
163
    bdev = objects.Disk.FromDict(bdev_s)
164
    return backend.BlockdevRemove(bdev)
165

    
166
  @staticmethod
167
  def perspective_blockdev_rename(params):
168
    """Remove a block device.
169

    
170
    """
171
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
172
    return backend.BlockdevRename(devlist)
173

    
174
  @staticmethod
175
  def perspective_blockdev_assemble(params):
176
    """Assemble a block device.
177

    
178
    """
179
    bdev_s, owner, on_primary = params
180
    bdev = objects.Disk.FromDict(bdev_s)
181
    if bdev is None:
182
      raise ValueError("can't unserialize data!")
183
    return backend.BlockdevAssemble(bdev, owner, on_primary)
184

    
185
  @staticmethod
186
  def perspective_blockdev_shutdown(params):
187
    """Shutdown a block device.
188

    
189
    """
190
    bdev_s = params[0]
191
    bdev = objects.Disk.FromDict(bdev_s)
192
    if bdev is None:
193
      raise ValueError("can't unserialize data!")
194
    return backend.BlockdevShutdown(bdev)
195

    
196
  @staticmethod
197
  def perspective_blockdev_addchildren(params):
198
    """Add a child to a mirror device.
199

    
200
    Note: this is only valid for mirror devices. It's the caller's duty
201
    to send a correct disk, otherwise we raise an error.
202

    
203
    """
204
    bdev_s, ndev_s = params
205
    bdev = objects.Disk.FromDict(bdev_s)
206
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
207
    if bdev is None or ndevs.count(None) > 0:
208
      raise ValueError("can't unserialize data!")
209
    return backend.BlockdevAddchildren(bdev, ndevs)
210

    
211
  @staticmethod
212
  def perspective_blockdev_removechildren(params):
213
    """Remove a child from a mirror device.
214

    
215
    This is only valid for mirror devices, of course. It's the callers
216
    duty to send a correct disk, otherwise we raise an error.
217

    
218
    """
219
    bdev_s, ndev_s = params
220
    bdev = objects.Disk.FromDict(bdev_s)
221
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
222
    if bdev is None or ndevs.count(None) > 0:
223
      raise ValueError("can't unserialize data!")
224
    return backend.BlockdevRemovechildren(bdev, ndevs)
225

    
226
  @staticmethod
227
  def perspective_blockdev_getmirrorstatus(params):
228
    """Return the mirror status for a list of disks.
229

    
230
    """
231
    disks = [objects.Disk.FromDict(dsk_s)
232
             for dsk_s in params]
233
    return [status.ToDict()
234
            for status in backend.BlockdevGetmirrorstatus(disks)]
235

    
236
  @staticmethod
237
  def perspective_blockdev_find(params):
238
    """Expose the FindBlockDevice functionality for a disk.
239

    
240
    This will try to find but not activate a disk.
241

    
242
    """
243
    disk = objects.Disk.FromDict(params[0])
244

    
245
    result = backend.BlockdevFind(disk)
246
    if result is None:
247
      return None
248

    
249
    return result.ToDict()
250

    
251
  @staticmethod
252
  def perspective_blockdev_snapshot(params):
253
    """Create a snapshot device.
254

    
255
    Note that this is only valid for LVM disks, if we get passed
256
    something else we raise an exception. The snapshot device can be
257
    remove by calling the generic block device remove call.
258

    
259
    """
260
    cfbd = objects.Disk.FromDict(params[0])
261
    return backend.BlockdevSnapshot(cfbd)
262

    
263
  @staticmethod
264
  def perspective_blockdev_grow(params):
265
    """Grow a stack of devices.
266

    
267
    """
268
    cfbd = objects.Disk.FromDict(params[0])
269
    amount = params[1]
270
    return backend.BlockdevGrow(cfbd, amount)
271

    
272
  @staticmethod
273
  def perspective_blockdev_close(params):
274
    """Closes the given block devices.
275

    
276
    """
277
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
278
    return backend.BlockdevClose(params[0], disks)
279

    
280
  @staticmethod
281
  def perspective_blockdev_getsize(params):
282
    """Compute the sizes of the given block devices.
283

    
284
    """
285
    disks = [objects.Disk.FromDict(cf) for cf in params[0]]
286
    return backend.BlockdevGetsize(disks)
287

    
288
  @staticmethod
289
  def perspective_blockdev_export(params):
290
    """Compute the sizes of the given block devices.
291

    
292
    """
293
    disk = objects.Disk.FromDict(params[0])
294
    dest_node, dest_path, cluster_name = params[1:]
295
    return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
296

    
297
  # blockdev/drbd specific methods ----------
298

    
299
  @staticmethod
300
  def perspective_drbd_disconnect_net(params):
301
    """Disconnects the network connection of drbd disks.
302

    
303
    Note that this is only valid for drbd disks, so the members of the
304
    disk list must all be drbd devices.
305

    
306
    """
307
    nodes_ip, disks = params
308
    disks = [objects.Disk.FromDict(cf) for cf in disks]
309
    return backend.DrbdDisconnectNet(nodes_ip, disks)
310

    
311
  @staticmethod
312
  def perspective_drbd_attach_net(params):
313
    """Attaches the network connection of drbd disks.
314

    
315
    Note that this is only valid for drbd disks, so the members of the
316
    disk list must all be drbd devices.
317

    
318
    """
319
    nodes_ip, disks, instance_name, multimaster = params
320
    disks = [objects.Disk.FromDict(cf) for cf in disks]
321
    return backend.DrbdAttachNet(nodes_ip, disks,
322
                                     instance_name, multimaster)
323

    
324
  @staticmethod
325
  def perspective_drbd_wait_sync(params):
326
    """Wait until DRBD disks are synched.
327

    
328
    Note that this is only valid for drbd disks, so the members of the
329
    disk list must all be drbd devices.
330

    
331
    """
332
    nodes_ip, disks = params
333
    disks = [objects.Disk.FromDict(cf) for cf in disks]
334
    return backend.DrbdWaitSync(nodes_ip, disks)
335

    
336
  # export/import  --------------------------
337

    
338
  @staticmethod
339
  def perspective_snapshot_export(params):
340
    """Export a given snapshot.
341

    
342
    """
343
    disk = objects.Disk.FromDict(params[0])
344
    dest_node = params[1]
345
    instance = objects.Instance.FromDict(params[2])
346
    cluster_name = params[3]
347
    dev_idx = params[4]
348
    return backend.ExportSnapshot(disk, dest_node, instance,
349
                                  cluster_name, dev_idx)
350

    
351
  @staticmethod
352
  def perspective_finalize_export(params):
353
    """Expose the finalize export functionality.
354

    
355
    """
356
    instance = objects.Instance.FromDict(params[0])
357
    snap_disks = [objects.Disk.FromDict(str_data)
358
                  for str_data in params[1]]
359
    return backend.FinalizeExport(instance, snap_disks)
360

    
361
  @staticmethod
362
  def perspective_export_info(params):
363
    """Query information about an existing export on this node.
364

    
365
    The given path may not contain an export, in which case we return
366
    None.
367

    
368
    """
369
    path = params[0]
370
    return backend.ExportInfo(path)
371

    
372
  @staticmethod
373
  def perspective_export_list(params):
374
    """List the available exports on this node.
375

    
376
    Note that as opposed to export_info, which may query data about an
377
    export in any path, this only queries the standard Ganeti path
378
    (constants.EXPORT_DIR).
379

    
380
    """
381
    return backend.ListExports()
382

    
383
  @staticmethod
384
  def perspective_export_remove(params):
385
    """Remove an export.
386

    
387
    """
388
    export = params[0]
389
    return backend.RemoveExport(export)
390

    
391
  # volume  --------------------------
392

    
393
  @staticmethod
394
  def perspective_lv_list(params):
395
    """Query the list of logical volumes in a given volume group.
396

    
397
    """
398
    vgname = params[0]
399
    return backend.GetVolumeList(vgname)
400

    
401
  @staticmethod
402
  def perspective_vg_list(params):
403
    """Query the list of volume groups.
404

    
405
    """
406
    return backend.ListVolumeGroups()
407

    
408
  # Storage --------------------------
409

    
410
  @staticmethod
411
  def perspective_storage_list(params):
412
    """Get list of storage units.
413

    
414
    """
415
    (su_name, su_args, name, fields) = params
416
    return storage.GetStorage(su_name, *su_args).List(name, fields)
417

    
418
  @staticmethod
419
  def perspective_storage_modify(params):
420
    """Modify a storage unit.
421

    
422
    """
423
    (su_name, su_args, name, changes) = params
424
    return storage.GetStorage(su_name, *su_args).Modify(name, changes)
425

    
426
  @staticmethod
427
  def perspective_storage_execute(params):
428
    """Execute an operation on a storage unit.
429

    
430
    """
431
    (su_name, su_args, name, op) = params
432
    return storage.GetStorage(su_name, *su_args).Execute(name, op)
433

    
434
  # bridge  --------------------------
435

    
436
  @staticmethod
437
  def perspective_bridges_exist(params):
438
    """Check if all bridges given exist on this node.
439

    
440
    """
441
    bridges_list = params[0]
442
    return backend.BridgesExist(bridges_list)
443

    
444
  # instance  --------------------------
445

    
446
  @staticmethod
447
  def perspective_instance_os_add(params):
448
    """Install an OS on a given instance.
449

    
450
    """
451
    inst_s = params[0]
452
    inst = objects.Instance.FromDict(inst_s)
453
    reinstall = params[1]
454
    return backend.InstanceOsAdd(inst, reinstall)
455

    
456
  @staticmethod
457
  def perspective_instance_run_rename(params):
458
    """Runs the OS rename script for an instance.
459

    
460
    """
461
    inst_s, old_name = params
462
    inst = objects.Instance.FromDict(inst_s)
463
    return backend.RunRenameInstance(inst, old_name)
464

    
465
  @staticmethod
466
  def perspective_instance_os_import(params):
467
    """Run the import function of an OS onto a given instance.
468

    
469
    """
470
    inst_s, src_node, src_images, cluster_name = params
471
    inst = objects.Instance.FromDict(inst_s)
472
    return backend.ImportOSIntoInstance(inst, src_node, src_images,
473
                                        cluster_name)
474

    
475
  @staticmethod
476
  def perspective_instance_shutdown(params):
477
    """Shutdown an instance.
478

    
479
    """
480
    instance = objects.Instance.FromDict(params[0])
481
    timeout = params[1]
482
    return backend.InstanceShutdown(instance, timeout)
483

    
484
  @staticmethod
485
  def perspective_instance_start(params):
486
    """Start an instance.
487

    
488
    """
489
    instance = objects.Instance.FromDict(params[0])
490
    return backend.StartInstance(instance)
491

    
492
  @staticmethod
493
  def perspective_migration_info(params):
494
    """Gather information about an instance to be migrated.
495

    
496
    """
497
    instance = objects.Instance.FromDict(params[0])
498
    return backend.MigrationInfo(instance)
499

    
500
  @staticmethod
501
  def perspective_accept_instance(params):
502
    """Prepare the node to accept an instance.
503

    
504
    """
505
    instance, info, target = params
506
    instance = objects.Instance.FromDict(instance)
507
    return backend.AcceptInstance(instance, info, target)
508

    
509
  @staticmethod
510
  def perspective_finalize_migration(params):
511
    """Finalize the instance migration.
512

    
513
    """
514
    instance, info, success = params
515
    instance = objects.Instance.FromDict(instance)
516
    return backend.FinalizeMigration(instance, info, success)
517

    
518
  @staticmethod
519
  def perspective_instance_migrate(params):
520
    """Migrates an instance.
521

    
522
    """
523
    instance, target, live = params
524
    instance = objects.Instance.FromDict(instance)
525
    return backend.MigrateInstance(instance, target, live)
526

    
527
  @staticmethod
528
  def perspective_instance_reboot(params):
529
    """Reboot an instance.
530

    
531
    """
532
    instance = objects.Instance.FromDict(params[0])
533
    reboot_type = params[1]
534
    shutdown_timeout = params[2]
535
    return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
536

    
537
  @staticmethod
538
  def perspective_instance_info(params):
539
    """Query instance information.
540

    
541
    """
542
    return backend.GetInstanceInfo(params[0], params[1])
543

    
544
  @staticmethod
545
  def perspective_instance_migratable(params):
546
    """Query whether the specified instance can be migrated.
547

    
548
    """
549
    instance = objects.Instance.FromDict(params[0])
550
    return backend.GetInstanceMigratable(instance)
551

    
552
  @staticmethod
553
  def perspective_all_instances_info(params):
554
    """Query information about all instances.
555

    
556
    """
557
    return backend.GetAllInstancesInfo(params[0])
558

    
559
  @staticmethod
560
  def perspective_instance_list(params):
561
    """Query the list of running instances.
562

    
563
    """
564
    return backend.GetInstanceList(params[0])
565

    
566
  # node --------------------------
567

    
568
  @staticmethod
569
  def perspective_node_tcp_ping(params):
570
    """Do a TcpPing on the remote node.
571

    
572
    """
573
    return utils.TcpPing(params[1], params[2], timeout=params[3],
574
                         live_port_needed=params[4], source=params[0])
575

    
576
  @staticmethod
577
  def perspective_node_has_ip_address(params):
578
    """Checks if a node has the given ip address.
579

    
580
    """
581
    return utils.OwnIpAddress(params[0])
582

    
583
  @staticmethod
584
  def perspective_node_info(params):
585
    """Query node information.
586

    
587
    """
588
    vgname, hypervisor_type = params
589
    return backend.GetNodeInfo(vgname, hypervisor_type)
590

    
591
  @staticmethod
592
  def perspective_node_add(params):
593
    """Complete the registration of this node in the cluster.
594

    
595
    """
596
    return backend.AddNode(params[0], params[1], params[2],
597
                           params[3], params[4], params[5])
598

    
599
  @staticmethod
600
  def perspective_node_verify(params):
601
    """Run a verify sequence on this node.
602

    
603
    """
604
    return backend.VerifyNode(params[0], params[1])
605

    
606
  @staticmethod
607
  def perspective_node_start_master(params):
608
    """Promote this node to master status.
609

    
610
    """
611
    return backend.StartMaster(params[0], params[1])
612

    
613
  @staticmethod
614
  def perspective_node_stop_master(params):
615
    """Demote this node from master status.
616

    
617
    """
618
    return backend.StopMaster(params[0])
619

    
620
  @staticmethod
621
  def perspective_node_leave_cluster(params):
622
    """Cleanup after leaving a cluster.
623

    
624
    """
625
    return backend.LeaveCluster(params[0])
626

    
627
  @staticmethod
628
  def perspective_node_volumes(params):
629
    """Query the list of all logical volume groups.
630

    
631
    """
632
    return backend.NodeVolumes()
633

    
634
  @staticmethod
635
  def perspective_node_demote_from_mc(params):
636
    """Demote a node from the master candidate role.
637

    
638
    """
639
    return backend.DemoteFromMC()
640

    
641

    
642
  @staticmethod
643
  def perspective_node_powercycle(params):
644
    """Tries to powercycle the nod.
645

    
646
    """
647
    hypervisor_type = params[0]
648
    return backend.PowercycleNode(hypervisor_type)
649

    
650

    
651
  # cluster --------------------------
652

    
653
  @staticmethod
654
  def perspective_version(params):
655
    """Query version information.
656

    
657
    """
658
    return constants.PROTOCOL_VERSION
659

    
660
  @staticmethod
661
  def perspective_upload_file(params):
662
    """Upload a file.
663

    
664
    Note that the backend implementation imposes strict rules on which
665
    files are accepted.
666

    
667
    """
668
    return backend.UploadFile(*params)
669

    
670
  @staticmethod
671
  def perspective_master_info(params):
672
    """Query master information.
673

    
674
    """
675
    return backend.GetMasterInfo()
676

    
677
  @staticmethod
678
  def perspective_write_ssconf_files(params):
679
    """Write ssconf files.
680

    
681
    """
682
    (values,) = params
683
    return backend.WriteSsconfFiles(values)
684

    
685
  # os -----------------------
686

    
687
  @staticmethod
688
  def perspective_os_diagnose(params):
689
    """Query detailed information about existing OSes.
690

    
691
    """
692
    return backend.DiagnoseOS()
693

    
694
  @staticmethod
695
  def perspective_os_get(params):
696
    """Query information about a given OS.
697

    
698
    """
699
    name = params[0]
700
    os_obj = backend.OSFromDisk(name)
701
    return os_obj.ToDict()
702

    
703
  # hooks -----------------------
704

    
705
  @staticmethod
706
  def perspective_hooks_runner(params):
707
    """Run hook scripts.
708

    
709
    """
710
    hpath, phase, env = params
711
    hr = backend.HooksRunner()
712
    return hr.RunHooks(hpath, phase, env)
713

    
714
  # iallocator -----------------
715

    
716
  @staticmethod
717
  def perspective_iallocator_runner(params):
718
    """Run an iallocator script.
719

    
720
    """
721
    name, idata = params
722
    iar = backend.IAllocatorRunner()
723
    return iar.Run(name, idata)
724

    
725
  # test -----------------------
726

    
727
  @staticmethod
728
  def perspective_test_delay(params):
729
    """Run test delay.
730

    
731
    """
732
    duration = params[0]
733
    status, rval = utils.TestDelay(duration)
734
    if not status:
735
      raise backend.RPCFail(rval)
736
    return rval
737

    
738
  # file storage ---------------
739

    
740
  @staticmethod
741
  def perspective_file_storage_dir_create(params):
742
    """Create the file storage directory.
743

    
744
    """
745
    file_storage_dir = params[0]
746
    return backend.CreateFileStorageDir(file_storage_dir)
747

    
748
  @staticmethod
749
  def perspective_file_storage_dir_remove(params):
750
    """Remove the file storage directory.
751

    
752
    """
753
    file_storage_dir = params[0]
754
    return backend.RemoveFileStorageDir(file_storage_dir)
755

    
756
  @staticmethod
757
  def perspective_file_storage_dir_rename(params):
758
    """Rename the file storage directory.
759

    
760
    """
761
    old_file_storage_dir = params[0]
762
    new_file_storage_dir = params[1]
763
    return backend.RenameFileStorageDir(old_file_storage_dir,
764
                                        new_file_storage_dir)
765

    
766
  # jobs ------------------------
767

    
768
  @staticmethod
769
  @_RequireJobQueueLock
770
  def perspective_jobqueue_update(params):
771
    """Update job queue.
772

    
773
    """
774
    (file_name, content) = params
775
    return backend.JobQueueUpdate(file_name, content)
776

    
777
  @staticmethod
778
  @_RequireJobQueueLock
779
  def perspective_jobqueue_purge(params):
780
    """Purge job queue.
781

    
782
    """
783
    return backend.JobQueuePurge()
784

    
785
  @staticmethod
786
  @_RequireJobQueueLock
787
  def perspective_jobqueue_rename(params):
788
    """Rename a job queue file.
789

    
790
    """
791
    # TODO: What if a file fails to rename?
792
    return [backend.JobQueueRename(old, new) for old, new in params]
793

    
794
  @staticmethod
795
  def perspective_jobqueue_set_drain(params):
796
    """Set/unset the queue drain flag.
797

    
798
    """
799
    drain_flag = params[0]
800
    return backend.JobQueueSetDrainFlag(drain_flag)
801

    
802

    
803
  # hypervisor ---------------
804

    
805
  @staticmethod
806
  def perspective_hypervisor_validate_params(params):
807
    """Validate the hypervisor parameters.
808

    
809
    """
810
    (hvname, hvparams) = params
811
    return backend.ValidateHVParams(hvname, hvparams)
812

    
813

    
814
def CheckNoded(_, args):
815
  """Initial checks whether to run or exit with a failure.
816

    
817
  """
818
  if args: # noded doesn't take any arguments
819
    print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
820
                          sys.argv[0])
821
    sys.exit(constants.EXIT_FAILURE)
822

    
823

    
824
def ExecNoded(options, _):
825
  """Main node daemon function, executed with the PID file held.
826

    
827
  """
828
  # Read SSL certificate
829
  if options.ssl:
830
    ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
831
                                    ssl_cert_path=options.ssl_cert)
832
  else:
833
    ssl_params = None
834

    
835
  err = _PrepareQueueLock()
836
  if err is not None:
837
    # this might be some kind of file-system/permission error; while
838
    # this breaks the job queue functionality, we shouldn't prevent
839
    # startup of the whole node daemon because of this
840
    logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
841

    
842
  mainloop = daemon.Mainloop()
843
  server = NodeHttpServer(mainloop, options.bind_address, options.port,
844
                          ssl_params=ssl_params, ssl_verify_peer=True)
845
  server.Start()
846
  try:
847
    mainloop.Run()
848
  finally:
849
    server.Stop()
850

    
851

    
852
def main():
853
  """Main function for the node daemon.
854

    
855
  """
856
  parser = OptionParser(description="Ganeti node daemon",
857
                        usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
858
                        version="%%prog (ganeti) %s" %
859
                        constants.RELEASE_VERSION)
860
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
861
  dirs.append((constants.LOG_OS_DIR, 0750))
862
  dirs.append((constants.LOCK_DIR, 1777))
863
  daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded)
864

    
865

    
866
if __name__ == '__main__':
867
  main()