Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 47f8a2d2

History | View | Annotate | Download (24.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# pylint: disable-msg=C0103,W0142
25

    
26
# C0103: Functions in this module need to have a given name structure,
27
# and the name of the daemon doesn't match
28

    
29
# W0142: Used * or ** magic, since we do use it extensively in this
30
# module
31

    
32
import os
33
import sys
34
import logging
35
import signal
36

    
37
from optparse import OptionParser
38

    
39
from ganeti import backend
40
from ganeti import constants
41
from ganeti import objects
42
from ganeti import errors
43
from ganeti import jstore
44
from ganeti import daemon
45
from ganeti import http
46
from ganeti import utils
47
from ganeti import storage
48
from ganeti import serializer
49

    
50
import ganeti.http.server # pylint: disable-msg=W0611
51

    
52

    
53
queue_lock = None
54

    
55

    
56
def _PrepareQueueLock():
57
  """Try to prepare the queue lock.
58

    
59
  @return: None for success, otherwise an exception object
60

    
61
  """
62
  global queue_lock # pylint: disable-msg=W0603
63

    
64
  if queue_lock is not None:
65
    return None
66

    
67
  # Prepare job queue
68
  try:
69
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
70
    return None
71
  except EnvironmentError, err:
72
    return err
73

    
74

    
75
def _RequireJobQueueLock(fn):
76
  """Decorator for job queue manipulating functions.
77

    
78
  """
79
  QUEUE_LOCK_TIMEOUT = 10
80

    
81
  def wrapper(*args, **kwargs):
82
    # Locking in exclusive, blocking mode because there could be several
83
    # children running at the same time. Waiting up to 10 seconds.
84
    if _PrepareQueueLock() is not None:
85
      raise errors.JobQueueError("Job queue failed initialization,"
86
                                 " cannot update jobs")
87
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
88
    try:
89
      return fn(*args, **kwargs)
90
    finally:
91
      queue_lock.Unlock()
92

    
93
  return wrapper
94

    
95

    
96
def _DecodeImportExportIO(ieio, ieioargs):
97
  """Decodes import/export I/O information.
98

    
99
  """
100
  if ieio == constants.IEIO_RAW_DISK:
101
    assert len(ieioargs) == 1
102
    return (objects.Disk.FromDict(ieioargs[0]), )
103

    
104
  if ieio == constants.IEIO_SCRIPT:
105
    assert len(ieioargs) == 2
106
    return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
107

    
108
  return ieioargs
109

    
110

    
111
class NodeHttpServer(http.server.HttpServer):
112
  """The server implementation.
113

    
114
  This class holds all methods exposed over the RPC interface.
115

    
116
  """
117
  # too many public methods, and unused args - all methods get params
118
  # due to the API
119
  # pylint: disable-msg=R0904,W0613
120
  def __init__(self, *args, **kwargs):
121
    http.server.HttpServer.__init__(self, *args, **kwargs)
122
    self.noded_pid = os.getpid()
123

    
124
  def HandleRequest(self, req):
125
    """Handle a request.
126

    
127
    """
128
    if req.request_method.upper() != http.HTTP_PUT:
129
      raise http.HttpBadRequest()
130

    
131
    path = req.request_path
132
    if path.startswith("/"):
133
      path = path[1:]
134

    
135
    method = getattr(self, "perspective_%s" % path, None)
136
    if method is None:
137
      raise http.HttpNotFound()
138

    
139
    try:
140
      result = (True, method(serializer.LoadJson(req.request_body)))
141

    
142
    except backend.RPCFail, err:
143
      # our custom failure exception; str(err) works fine if the
144
      # exception was constructed with a single argument, and in
145
      # this case, err.message == err.args[0] == str(err)
146
      result = (False, str(err))
147
    except errors.QuitGanetiException, err:
148
      # Tell parent to quit
149
      logging.info("Shutting down the node daemon, arguments: %s",
150
                   str(err.args))
151
      os.kill(self.noded_pid, signal.SIGTERM)
152
      # And return the error's arguments, which must be already in
153
      # correct tuple format
154
      result = err.args
155
    except Exception, err:
156
      logging.exception("Error in RPC call")
157
      result = (False, "Error while executing backend function: %s" % str(err))
158

    
159
    return serializer.DumpJson(result, indent=False)
160

    
161
  # the new block devices  --------------------------
162

    
163
  @staticmethod
164
  def perspective_blockdev_create(params):
165
    """Create a block device.
166

    
167
    """
168
    bdev_s, size, owner, on_primary, info = params
169
    bdev = objects.Disk.FromDict(bdev_s)
170
    if bdev is None:
171
      raise ValueError("can't unserialize data!")
172
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
173

    
174
  @staticmethod
175
  def perspective_blockdev_remove(params):
176
    """Remove a block device.
177

    
178
    """
179
    bdev_s = params[0]
180
    bdev = objects.Disk.FromDict(bdev_s)
181
    return backend.BlockdevRemove(bdev)
182

    
183
  @staticmethod
184
  def perspective_blockdev_rename(params):
185
    """Remove a block device.
186

    
187
    """
188
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
189
    return backend.BlockdevRename(devlist)
190

    
191
  @staticmethod
192
  def perspective_blockdev_assemble(params):
193
    """Assemble a block device.
194

    
195
    """
196
    bdev_s, owner, on_primary = params
197
    bdev = objects.Disk.FromDict(bdev_s)
198
    if bdev is None:
199
      raise ValueError("can't unserialize data!")
200
    return backend.BlockdevAssemble(bdev, owner, on_primary)
201

    
202
  @staticmethod
203
  def perspective_blockdev_shutdown(params):
204
    """Shutdown a block device.
205

    
206
    """
207
    bdev_s = params[0]
208
    bdev = objects.Disk.FromDict(bdev_s)
209
    if bdev is None:
210
      raise ValueError("can't unserialize data!")
211
    return backend.BlockdevShutdown(bdev)
212

    
213
  @staticmethod
214
  def perspective_blockdev_addchildren(params):
215
    """Add a child to a mirror device.
216

    
217
    Note: this is only valid for mirror devices. It's the caller's duty
218
    to send a correct disk, otherwise we raise an error.
219

    
220
    """
221
    bdev_s, ndev_s = params
222
    bdev = objects.Disk.FromDict(bdev_s)
223
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
224
    if bdev is None or ndevs.count(None) > 0:
225
      raise ValueError("can't unserialize data!")
226
    return backend.BlockdevAddchildren(bdev, ndevs)
227

    
228
  @staticmethod
229
  def perspective_blockdev_removechildren(params):
230
    """Remove a child from a mirror device.
231

    
232
    This is only valid for mirror devices, of course. It's the callers
233
    duty to send a correct disk, otherwise we raise an error.
234

    
235
    """
236
    bdev_s, ndev_s = params
237
    bdev = objects.Disk.FromDict(bdev_s)
238
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
239
    if bdev is None or ndevs.count(None) > 0:
240
      raise ValueError("can't unserialize data!")
241
    return backend.BlockdevRemovechildren(bdev, ndevs)
242

    
243
  @staticmethod
244
  def perspective_blockdev_getmirrorstatus(params):
245
    """Return the mirror status for a list of disks.
246

    
247
    """
248
    disks = [objects.Disk.FromDict(dsk_s)
249
             for dsk_s in params]
250
    return [status.ToDict()
251
            for status in backend.BlockdevGetmirrorstatus(disks)]
252

    
253
  @staticmethod
254
  def perspective_blockdev_find(params):
255
    """Expose the FindBlockDevice functionality for a disk.
256

    
257
    This will try to find but not activate a disk.
258

    
259
    """
260
    disk = objects.Disk.FromDict(params[0])
261

    
262
    result = backend.BlockdevFind(disk)
263
    if result is None:
264
      return None
265

    
266
    return result.ToDict()
267

    
268
  @staticmethod
269
  def perspective_blockdev_snapshot(params):
270
    """Create a snapshot device.
271

    
272
    Note that this is only valid for LVM disks, if we get passed
273
    something else we raise an exception. The snapshot device can be
274
    remove by calling the generic block device remove call.
275

    
276
    """
277
    cfbd = objects.Disk.FromDict(params[0])
278
    return backend.BlockdevSnapshot(cfbd)
279

    
280
  @staticmethod
281
  def perspective_blockdev_grow(params):
282
    """Grow a stack of devices.
283

    
284
    """
285
    cfbd = objects.Disk.FromDict(params[0])
286
    amount = params[1]
287
    return backend.BlockdevGrow(cfbd, amount)
288

    
289
  @staticmethod
290
  def perspective_blockdev_close(params):
291
    """Closes the given block devices.
292

    
293
    """
294
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
295
    return backend.BlockdevClose(params[0], disks)
296

    
297
  @staticmethod
298
  def perspective_blockdev_getsize(params):
299
    """Compute the sizes of the given block devices.
300

    
301
    """
302
    disks = [objects.Disk.FromDict(cf) for cf in params[0]]
303
    return backend.BlockdevGetsize(disks)
304

    
305
  @staticmethod
306
  def perspective_blockdev_export(params):
307
    """Compute the sizes of the given block devices.
308

    
309
    """
310
    disk = objects.Disk.FromDict(params[0])
311
    dest_node, dest_path, cluster_name = params[1:]
312
    return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
313

    
314
  # blockdev/drbd specific methods ----------
315

    
316
  @staticmethod
317
  def perspective_drbd_disconnect_net(params):
318
    """Disconnects the network connection of drbd disks.
319

    
320
    Note that this is only valid for drbd disks, so the members of the
321
    disk list must all be drbd devices.
322

    
323
    """
324
    nodes_ip, disks = params
325
    disks = [objects.Disk.FromDict(cf) for cf in disks]
326
    return backend.DrbdDisconnectNet(nodes_ip, disks)
327

    
328
  @staticmethod
329
  def perspective_drbd_attach_net(params):
330
    """Attaches the network connection of drbd disks.
331

    
332
    Note that this is only valid for drbd disks, so the members of the
333
    disk list must all be drbd devices.
334

    
335
    """
336
    nodes_ip, disks, instance_name, multimaster = params
337
    disks = [objects.Disk.FromDict(cf) for cf in disks]
338
    return backend.DrbdAttachNet(nodes_ip, disks,
339
                                     instance_name, multimaster)
340

    
341
  @staticmethod
342
  def perspective_drbd_wait_sync(params):
343
    """Wait until DRBD disks are synched.
344

    
345
    Note that this is only valid for drbd disks, so the members of the
346
    disk list must all be drbd devices.
347

    
348
    """
349
    nodes_ip, disks = params
350
    disks = [objects.Disk.FromDict(cf) for cf in disks]
351
    return backend.DrbdWaitSync(nodes_ip, disks)
352

    
353
  # export/import  --------------------------
354

    
355
  @staticmethod
356
  def perspective_finalize_export(params):
357
    """Expose the finalize export functionality.
358

    
359
    """
360
    instance = objects.Instance.FromDict(params[0])
361

    
362
    snap_disks = []
363
    for disk in params[1]:
364
      if isinstance(disk, bool):
365
        snap_disks.append(disk)
366
      else:
367
        snap_disks.append(objects.Disk.FromDict(disk))
368

    
369
    return backend.FinalizeExport(instance, snap_disks)
370

    
371
  @staticmethod
372
  def perspective_export_info(params):
373
    """Query information about an existing export on this node.
374

    
375
    The given path may not contain an export, in which case we return
376
    None.
377

    
378
    """
379
    path = params[0]
380
    return backend.ExportInfo(path)
381

    
382
  @staticmethod
383
  def perspective_export_list(params):
384
    """List the available exports on this node.
385

    
386
    Note that as opposed to export_info, which may query data about an
387
    export in any path, this only queries the standard Ganeti path
388
    (constants.EXPORT_DIR).
389

    
390
    """
391
    return backend.ListExports()
392

    
393
  @staticmethod
394
  def perspective_export_remove(params):
395
    """Remove an export.
396

    
397
    """
398
    export = params[0]
399
    return backend.RemoveExport(export)
400

    
401
  # volume  --------------------------
402

    
403
  @staticmethod
404
  def perspective_lv_list(params):
405
    """Query the list of logical volumes in a given volume group.
406

    
407
    """
408
    vgname = params[0]
409
    return backend.GetVolumeList(vgname)
410

    
411
  @staticmethod
412
  def perspective_vg_list(params):
413
    """Query the list of volume groups.
414

    
415
    """
416
    return backend.ListVolumeGroups()
417

    
418
  # Storage --------------------------
419

    
420
  @staticmethod
421
  def perspective_storage_list(params):
422
    """Get list of storage units.
423

    
424
    """
425
    (su_name, su_args, name, fields) = params
426
    return storage.GetStorage(su_name, *su_args).List(name, fields)
427

    
428
  @staticmethod
429
  def perspective_storage_modify(params):
430
    """Modify a storage unit.
431

    
432
    """
433
    (su_name, su_args, name, changes) = params
434
    return storage.GetStorage(su_name, *su_args).Modify(name, changes)
435

    
436
  @staticmethod
437
  def perspective_storage_execute(params):
438
    """Execute an operation on a storage unit.
439

    
440
    """
441
    (su_name, su_args, name, op) = params
442
    return storage.GetStorage(su_name, *su_args).Execute(name, op)
443

    
444
  # bridge  --------------------------
445

    
446
  @staticmethod
447
  def perspective_bridges_exist(params):
448
    """Check if all bridges given exist on this node.
449

    
450
    """
451
    bridges_list = params[0]
452
    return backend.BridgesExist(bridges_list)
453

    
454
  # instance  --------------------------
455

    
456
  @staticmethod
457
  def perspective_instance_os_add(params):
458
    """Install an OS on a given instance.
459

    
460
    """
461
    inst_s = params[0]
462
    inst = objects.Instance.FromDict(inst_s)
463
    reinstall = params[1]
464
    debug = params[2]
465
    return backend.InstanceOsAdd(inst, reinstall, debug)
466

    
467
  @staticmethod
468
  def perspective_instance_run_rename(params):
469
    """Runs the OS rename script for an instance.
470

    
471
    """
472
    inst_s, old_name, debug = params
473
    inst = objects.Instance.FromDict(inst_s)
474
    return backend.RunRenameInstance(inst, old_name, debug)
475

    
476
  @staticmethod
477
  def perspective_instance_shutdown(params):
478
    """Shutdown an instance.
479

    
480
    """
481
    instance = objects.Instance.FromDict(params[0])
482
    timeout = params[1]
483
    return backend.InstanceShutdown(instance, timeout)
484

    
485
  @staticmethod
486
  def perspective_instance_start(params):
487
    """Start an instance.
488

    
489
    """
490
    instance = objects.Instance.FromDict(params[0])
491
    return backend.StartInstance(instance)
492

    
493
  @staticmethod
494
  def perspective_migration_info(params):
495
    """Gather information about an instance to be migrated.
496

    
497
    """
498
    instance = objects.Instance.FromDict(params[0])
499
    return backend.MigrationInfo(instance)
500

    
501
  @staticmethod
502
  def perspective_accept_instance(params):
503
    """Prepare the node to accept an instance.
504

    
505
    """
506
    instance, info, target = params
507
    instance = objects.Instance.FromDict(instance)
508
    return backend.AcceptInstance(instance, info, target)
509

    
510
  @staticmethod
511
  def perspective_finalize_migration(params):
512
    """Finalize the instance migration.
513

    
514
    """
515
    instance, info, success = params
516
    instance = objects.Instance.FromDict(instance)
517
    return backend.FinalizeMigration(instance, info, success)
518

    
519
  @staticmethod
520
  def perspective_instance_migrate(params):
521
    """Migrates an instance.
522

    
523
    """
524
    instance, target, live = params
525
    instance = objects.Instance.FromDict(instance)
526
    return backend.MigrateInstance(instance, target, live)
527

    
528
  @staticmethod
529
  def perspective_instance_reboot(params):
530
    """Reboot an instance.
531

    
532
    """
533
    instance = objects.Instance.FromDict(params[0])
534
    reboot_type = params[1]
535
    shutdown_timeout = params[2]
536
    return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
537

    
538
  @staticmethod
539
  def perspective_instance_info(params):
540
    """Query instance information.
541

    
542
    """
543
    return backend.GetInstanceInfo(params[0], params[1])
544

    
545
  @staticmethod
546
  def perspective_instance_migratable(params):
547
    """Query whether the specified instance can be migrated.
548

    
549
    """
550
    instance = objects.Instance.FromDict(params[0])
551
    return backend.GetInstanceMigratable(instance)
552

    
553
  @staticmethod
554
  def perspective_all_instances_info(params):
555
    """Query information about all instances.
556

    
557
    """
558
    return backend.GetAllInstancesInfo(params[0])
559

    
560
  @staticmethod
561
  def perspective_instance_list(params):
562
    """Query the list of running instances.
563

    
564
    """
565
    return backend.GetInstanceList(params[0])
566

    
567
  # node --------------------------
568

    
569
  @staticmethod
570
  def perspective_node_tcp_ping(params):
571
    """Do a TcpPing on the remote node.
572

    
573
    """
574
    return utils.TcpPing(params[1], params[2], timeout=params[3],
575
                         live_port_needed=params[4], source=params[0])
576

    
577
  @staticmethod
578
  def perspective_node_has_ip_address(params):
579
    """Checks if a node has the given ip address.
580

    
581
    """
582
    return utils.OwnIpAddress(params[0])
583

    
584
  @staticmethod
585
  def perspective_node_info(params):
586
    """Query node information.
587

    
588
    """
589
    vgname, hypervisor_type = params
590
    return backend.GetNodeInfo(vgname, hypervisor_type)
591

    
592
  @staticmethod
593
  def perspective_node_add(params):
594
    """Complete the registration of this node in the cluster.
595

    
596
    """
597
    return backend.AddNode(params[0], params[1], params[2],
598
                           params[3], params[4], params[5])
599

    
600
  @staticmethod
601
  def perspective_node_verify(params):
602
    """Run a verify sequence on this node.
603

    
604
    """
605
    return backend.VerifyNode(params[0], params[1])
606

    
607
  @staticmethod
608
  def perspective_node_start_master(params):
609
    """Promote this node to master status.
610

    
611
    """
612
    return backend.StartMaster(params[0], params[1])
613

    
614
  @staticmethod
615
  def perspective_node_stop_master(params):
616
    """Demote this node from master status.
617

    
618
    """
619
    return backend.StopMaster(params[0])
620

    
621
  @staticmethod
622
  def perspective_node_leave_cluster(params):
623
    """Cleanup after leaving a cluster.
624

    
625
    """
626
    return backend.LeaveCluster(params[0])
627

    
628
  @staticmethod
629
  def perspective_node_volumes(params):
630
    """Query the list of all logical volume groups.
631

    
632
    """
633
    return backend.NodeVolumes()
634

    
635
  @staticmethod
636
  def perspective_node_demote_from_mc(params):
637
    """Demote a node from the master candidate role.
638

    
639
    """
640
    return backend.DemoteFromMC()
641

    
642

    
643
  @staticmethod
644
  def perspective_node_powercycle(params):
645
    """Tries to powercycle the nod.
646

    
647
    """
648
    hypervisor_type = params[0]
649
    return backend.PowercycleNode(hypervisor_type)
650

    
651

    
652
  # cluster --------------------------
653

    
654
  @staticmethod
655
  def perspective_version(params):
656
    """Query version information.
657

    
658
    """
659
    return constants.PROTOCOL_VERSION
660

    
661
  @staticmethod
662
  def perspective_upload_file(params):
663
    """Upload a file.
664

    
665
    Note that the backend implementation imposes strict rules on which
666
    files are accepted.
667

    
668
    """
669
    return backend.UploadFile(*params)
670

    
671
  @staticmethod
672
  def perspective_master_info(params):
673
    """Query master information.
674

    
675
    """
676
    return backend.GetMasterInfo()
677

    
678
  @staticmethod
679
  def perspective_write_ssconf_files(params):
680
    """Write ssconf files.
681

    
682
    """
683
    (values,) = params
684
    return backend.WriteSsconfFiles(values)
685

    
686
  # os -----------------------
687

    
688
  @staticmethod
689
  def perspective_os_diagnose(params):
690
    """Query detailed information about existing OSes.
691

    
692
    """
693
    return backend.DiagnoseOS()
694

    
695
  @staticmethod
696
  def perspective_os_get(params):
697
    """Query information about a given OS.
698

    
699
    """
700
    name = params[0]
701
    os_obj = backend.OSFromDisk(name)
702
    return os_obj.ToDict()
703

    
704
  # hooks -----------------------
705

    
706
  @staticmethod
707
  def perspective_hooks_runner(params):
708
    """Run hook scripts.
709

    
710
    """
711
    hpath, phase, env = params
712
    hr = backend.HooksRunner()
713
    return hr.RunHooks(hpath, phase, env)
714

    
715
  # iallocator -----------------
716

    
717
  @staticmethod
718
  def perspective_iallocator_runner(params):
719
    """Run an iallocator script.
720

    
721
    """
722
    name, idata = params
723
    iar = backend.IAllocatorRunner()
724
    return iar.Run(name, idata)
725

    
726
  # test -----------------------
727

    
728
  @staticmethod
729
  def perspective_test_delay(params):
730
    """Run test delay.
731

    
732
    """
733
    duration = params[0]
734
    status, rval = utils.TestDelay(duration)
735
    if not status:
736
      raise backend.RPCFail(rval)
737
    return rval
738

    
739
  # file storage ---------------
740

    
741
  @staticmethod
742
  def perspective_file_storage_dir_create(params):
743
    """Create the file storage directory.
744

    
745
    """
746
    file_storage_dir = params[0]
747
    return backend.CreateFileStorageDir(file_storage_dir)
748

    
749
  @staticmethod
750
  def perspective_file_storage_dir_remove(params):
751
    """Remove the file storage directory.
752

    
753
    """
754
    file_storage_dir = params[0]
755
    return backend.RemoveFileStorageDir(file_storage_dir)
756

    
757
  @staticmethod
758
  def perspective_file_storage_dir_rename(params):
759
    """Rename the file storage directory.
760

    
761
    """
762
    old_file_storage_dir = params[0]
763
    new_file_storage_dir = params[1]
764
    return backend.RenameFileStorageDir(old_file_storage_dir,
765
                                        new_file_storage_dir)
766

    
767
  # jobs ------------------------
768

    
769
  @staticmethod
770
  @_RequireJobQueueLock
771
  def perspective_jobqueue_update(params):
772
    """Update job queue.
773

    
774
    """
775
    (file_name, content) = params
776
    return backend.JobQueueUpdate(file_name, content)
777

    
778
  @staticmethod
779
  @_RequireJobQueueLock
780
  def perspective_jobqueue_purge(params):
781
    """Purge job queue.
782

    
783
    """
784
    return backend.JobQueuePurge()
785

    
786
  @staticmethod
787
  @_RequireJobQueueLock
788
  def perspective_jobqueue_rename(params):
789
    """Rename a job queue file.
790

    
791
    """
792
    # TODO: What if a file fails to rename?
793
    return [backend.JobQueueRename(old, new) for old, new in params]
794

    
795
  @staticmethod
796
  def perspective_jobqueue_set_drain(params):
797
    """Set/unset the queue drain flag.
798

    
799
    """
800
    drain_flag = params[0]
801
    return backend.JobQueueSetDrainFlag(drain_flag)
802

    
803

    
804
  # hypervisor ---------------
805

    
806
  @staticmethod
807
  def perspective_hypervisor_validate_params(params):
808
    """Validate the hypervisor parameters.
809

    
810
    """
811
    (hvname, hvparams) = params
812
    return backend.ValidateHVParams(hvname, hvparams)
813

    
814
  # Crypto
815

    
816
  @staticmethod
817
  def perspective_x509_cert_create(params):
818
    """Creates a new X509 certificate for SSL/TLS.
819

    
820
    """
821
    (validity, ) = params
822
    return backend.CreateX509Certificate(validity)
823

    
824
  @staticmethod
825
  def perspective_x509_cert_remove(params):
826
    """Removes a X509 certificate.
827

    
828
    """
829
    (name, ) = params
830
    return backend.RemoveX509Certificate(name)
831

    
832
  # Import and export
833

    
834
  @staticmethod
835
  def perspective_import_start(params):
836
    """Starts an import daemon.
837

    
838
    """
839
    (x509_key_name, source_x509_ca, instance, dest, dest_args) = params
840
    return backend.StartImportExportDaemon(constants.IEM_IMPORT,
841
                                           x509_key_name, source_x509_ca,
842
                                           None, None,
843
                                           objects.Instance.FromDict(instance),
844
                                           dest,
845
                                           _DecodeImportExportIO(dest,
846
                                                                 dest_args))
847
  @staticmethod
848
  def perspective_export_start(params):
849
    """Starts an export daemon.
850

    
851
    """
852
    (x509_key_name, dest_x509_ca, host, port, instance,
853
     source, source_args) = params
854
    return backend.StartImportExportDaemon(constants.IEM_EXPORT,
855
                                           x509_key_name, dest_x509_ca,
856
                                           host, port,
857
                                           objects.Instance.FromDict(instance),
858
                                           source,
859
                                           _DecodeImportExportIO(source,
860
                                                                 source_args))
861

    
862
  @staticmethod
863
  def perspective_impexp_status(params):
864
    """Retrieves the status of an import or export daemon.
865

    
866
    """
867
    return backend.GetImportExportStatus(params[0])
868

    
869
  @staticmethod
870
  def perspective_impexp_abort(params):
871
    """Aborts an import or export.
872

    
873
    """
874
    return backend.AbortImportExport(params[0])
875

    
876
  @staticmethod
877
  def perspective_impexp_cleanup(params):
878
    """Cleans up after an import or export.
879

    
880
    """
881
    return backend.CleanupImportExport(params[0])
882

    
883

    
884
def CheckNoded(_, args):
885
  """Initial checks whether to run or exit with a failure.
886

    
887
  """
888
  if args: # noded doesn't take any arguments
889
    print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
890
                          sys.argv[0])
891
    sys.exit(constants.EXIT_FAILURE)
892

    
893

    
894
def ExecNoded(options, _):
895
  """Main node daemon function, executed with the PID file held.
896

    
897
  """
898
  # Read SSL certificate
899
  if options.ssl:
900
    ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
901
                                    ssl_cert_path=options.ssl_cert)
902
  else:
903
    ssl_params = None
904

    
905
  err = _PrepareQueueLock()
906
  if err is not None:
907
    # this might be some kind of file-system/permission error; while
908
    # this breaks the job queue functionality, we shouldn't prevent
909
    # startup of the whole node daemon because of this
910
    logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
911

    
912
  mainloop = daemon.Mainloop()
913
  server = NodeHttpServer(mainloop, options.bind_address, options.port,
914
                          ssl_params=ssl_params, ssl_verify_peer=True)
915
  server.Start()
916
  try:
917
    mainloop.Run()
918
  finally:
919
    server.Stop()
920

    
921

    
922
def main():
923
  """Main function for the node daemon.
924

    
925
  """
926
  parser = OptionParser(description="Ganeti node daemon",
927
                        usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
928
                        version="%%prog (ganeti) %s" %
929
                        constants.RELEASE_VERSION)
930
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
931
  dirs.append((constants.LOG_OS_DIR, 0750))
932
  dirs.append((constants.LOCK_DIR, 1777))
933
  dirs.append((constants.CRYPTO_KEYS_DIR, constants.CRYPTO_KEYS_DIR_MODE))
934
  dirs.append((constants.IMPORT_EXPORT_DIR, constants.IMPORT_EXPORT_DIR_MODE))
935
  daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
936
                     default_ssl_cert=constants.NODED_CERT_FILE,
937
                     default_ssl_key=constants.NODED_CERT_FILE)
938

    
939

    
940
if __name__ == '__main__':
941
  main()