Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 6abf7f2c

History | View | Annotate | Download (25.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# pylint: disable-msg=C0103,W0142
25

    
26
# C0103: Functions in this module need to have a given name structure,
27
# and the name of the daemon doesn't match
28

    
29
# W0142: Used * or ** magic, since we do use it extensively in this
30
# module
31

    
32
import os
33
import sys
34
import logging
35
import signal
36

    
37
from optparse import OptionParser
38

    
39
from ganeti import backend
40
from ganeti import constants
41
from ganeti import objects
42
from ganeti import errors
43
from ganeti import jstore
44
from ganeti import daemon
45
from ganeti import http
46
from ganeti import utils
47
from ganeti import storage
48
from ganeti import serializer
49

    
50
import ganeti.http.server # pylint: disable-msg=W0611
51

    
52

    
53
queue_lock = None
54

    
55

    
56
def _PrepareQueueLock():
57
  """Try to prepare the queue lock.
58

    
59
  @return: None for success, otherwise an exception object
60

    
61
  """
62
  global queue_lock # pylint: disable-msg=W0603
63

    
64
  if queue_lock is not None:
65
    return None
66

    
67
  # Prepare job queue
68
  try:
69
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
70
    return None
71
  except EnvironmentError, err:
72
    return err
73

    
74

    
75
def _RequireJobQueueLock(fn):
76
  """Decorator for job queue manipulating functions.
77

    
78
  """
79
  QUEUE_LOCK_TIMEOUT = 10
80

    
81
  def wrapper(*args, **kwargs):
82
    # Locking in exclusive, blocking mode because there could be several
83
    # children running at the same time. Waiting up to 10 seconds.
84
    if _PrepareQueueLock() is not None:
85
      raise errors.JobQueueError("Job queue failed initialization,"
86
                                 " cannot update jobs")
87
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
88
    try:
89
      return fn(*args, **kwargs)
90
    finally:
91
      queue_lock.Unlock()
92

    
93
  return wrapper
94

    
95

    
96
def _DecodeImportExportIO(ieio, ieioargs):
97
  """Decodes import/export I/O information.
98

    
99
  """
100
  if ieio == constants.IEIO_RAW_DISK:
101
    assert len(ieioargs) == 1
102
    return (objects.Disk.FromDict(ieioargs[0]), )
103

    
104
  if ieio == constants.IEIO_SCRIPT:
105
    assert len(ieioargs) == 2
106
    return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
107

    
108
  return ieioargs
109

    
110

    
111
class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
112
  """Custom Request Executor class that ensures NodeHttpServer children are
113
  locked in ram.
114

    
115
  """
116
  def __init__(self, *args, **kwargs):
117
    utils.Mlockall()
118

    
119
    http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
120

    
121

    
122
class NodeHttpServer(http.server.HttpServer):
123
  """The server implementation.
124

    
125
  This class holds all methods exposed over the RPC interface.
126

    
127
  """
128
  # too many public methods, and unused args - all methods get params
129
  # due to the API
130
  # pylint: disable-msg=R0904,W0613
131
  def __init__(self, *args, **kwargs):
132
    http.server.HttpServer.__init__(self, *args, **kwargs)
133
    self.noded_pid = os.getpid()
134

    
135
  def HandleRequest(self, req):
136
    """Handle a request.
137

    
138
    """
139
    if req.request_method.upper() != http.HTTP_PUT:
140
      raise http.HttpBadRequest()
141

    
142
    path = req.request_path
143
    if path.startswith("/"):
144
      path = path[1:]
145

    
146
    method = getattr(self, "perspective_%s" % path, None)
147
    if method is None:
148
      raise http.HttpNotFound()
149

    
150
    try:
151
      result = (True, method(serializer.LoadJson(req.request_body)))
152

    
153
    except backend.RPCFail, err:
154
      # our custom failure exception; str(err) works fine if the
155
      # exception was constructed with a single argument, and in
156
      # this case, err.message == err.args[0] == str(err)
157
      result = (False, str(err))
158
    except errors.QuitGanetiException, err:
159
      # Tell parent to quit
160
      logging.info("Shutting down the node daemon, arguments: %s",
161
                   str(err.args))
162
      os.kill(self.noded_pid, signal.SIGTERM)
163
      # And return the error's arguments, which must be already in
164
      # correct tuple format
165
      result = err.args
166
    except Exception, err:
167
      logging.exception("Error in RPC call")
168
      result = (False, "Error while executing backend function: %s" % str(err))
169

    
170
    return serializer.DumpJson(result, indent=False)
171

    
172
  # the new block devices  --------------------------
173

    
174
  @staticmethod
175
  def perspective_blockdev_create(params):
176
    """Create a block device.
177

    
178
    """
179
    bdev_s, size, owner, on_primary, info = params
180
    bdev = objects.Disk.FromDict(bdev_s)
181
    if bdev is None:
182
      raise ValueError("can't unserialize data!")
183
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
184

    
185
  @staticmethod
186
  def perspective_blockdev_remove(params):
187
    """Remove a block device.
188

    
189
    """
190
    bdev_s = params[0]
191
    bdev = objects.Disk.FromDict(bdev_s)
192
    return backend.BlockdevRemove(bdev)
193

    
194
  @staticmethod
195
  def perspective_blockdev_rename(params):
196
    """Remove a block device.
197

    
198
    """
199
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
200
    return backend.BlockdevRename(devlist)
201

    
202
  @staticmethod
203
  def perspective_blockdev_assemble(params):
204
    """Assemble a block device.
205

    
206
    """
207
    bdev_s, owner, on_primary = params
208
    bdev = objects.Disk.FromDict(bdev_s)
209
    if bdev is None:
210
      raise ValueError("can't unserialize data!")
211
    return backend.BlockdevAssemble(bdev, owner, on_primary)
212

    
213
  @staticmethod
214
  def perspective_blockdev_shutdown(params):
215
    """Shutdown a block device.
216

    
217
    """
218
    bdev_s = params[0]
219
    bdev = objects.Disk.FromDict(bdev_s)
220
    if bdev is None:
221
      raise ValueError("can't unserialize data!")
222
    return backend.BlockdevShutdown(bdev)
223

    
224
  @staticmethod
225
  def perspective_blockdev_addchildren(params):
226
    """Add a child to a mirror device.
227

    
228
    Note: this is only valid for mirror devices. It's the caller's duty
229
    to send a correct disk, otherwise we raise an error.
230

    
231
    """
232
    bdev_s, ndev_s = params
233
    bdev = objects.Disk.FromDict(bdev_s)
234
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
235
    if bdev is None or ndevs.count(None) > 0:
236
      raise ValueError("can't unserialize data!")
237
    return backend.BlockdevAddchildren(bdev, ndevs)
238

    
239
  @staticmethod
240
  def perspective_blockdev_removechildren(params):
241
    """Remove a child from a mirror device.
242

    
243
    This is only valid for mirror devices, of course. It's the callers
244
    duty to send a correct disk, otherwise we raise an error.
245

    
246
    """
247
    bdev_s, ndev_s = params
248
    bdev = objects.Disk.FromDict(bdev_s)
249
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
250
    if bdev is None or ndevs.count(None) > 0:
251
      raise ValueError("can't unserialize data!")
252
    return backend.BlockdevRemovechildren(bdev, ndevs)
253

    
254
  @staticmethod
255
  def perspective_blockdev_getmirrorstatus(params):
256
    """Return the mirror status for a list of disks.
257

    
258
    """
259
    disks = [objects.Disk.FromDict(dsk_s)
260
             for dsk_s in params]
261
    return [status.ToDict()
262
            for status in backend.BlockdevGetmirrorstatus(disks)]
263

    
264
  @staticmethod
265
  def perspective_blockdev_find(params):
266
    """Expose the FindBlockDevice functionality for a disk.
267

    
268
    This will try to find but not activate a disk.
269

    
270
    """
271
    disk = objects.Disk.FromDict(params[0])
272

    
273
    result = backend.BlockdevFind(disk)
274
    if result is None:
275
      return None
276

    
277
    return result.ToDict()
278

    
279
  @staticmethod
280
  def perspective_blockdev_snapshot(params):
281
    """Create a snapshot device.
282

    
283
    Note that this is only valid for LVM disks, if we get passed
284
    something else we raise an exception. The snapshot device can be
285
    remove by calling the generic block device remove call.
286

    
287
    """
288
    cfbd = objects.Disk.FromDict(params[0])
289
    return backend.BlockdevSnapshot(cfbd)
290

    
291
  @staticmethod
292
  def perspective_blockdev_grow(params):
293
    """Grow a stack of devices.
294

    
295
    """
296
    cfbd = objects.Disk.FromDict(params[0])
297
    amount = params[1]
298
    return backend.BlockdevGrow(cfbd, amount)
299

    
300
  @staticmethod
301
  def perspective_blockdev_close(params):
302
    """Closes the given block devices.
303

    
304
    """
305
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
306
    return backend.BlockdevClose(params[0], disks)
307

    
308
  @staticmethod
309
  def perspective_blockdev_getsize(params):
310
    """Compute the sizes of the given block devices.
311

    
312
    """
313
    disks = [objects.Disk.FromDict(cf) for cf in params[0]]
314
    return backend.BlockdevGetsize(disks)
315

    
316
  @staticmethod
317
  def perspective_blockdev_export(params):
318
    """Compute the sizes of the given block devices.
319

    
320
    """
321
    disk = objects.Disk.FromDict(params[0])
322
    dest_node, dest_path, cluster_name = params[1:]
323
    return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
324

    
325
  # blockdev/drbd specific methods ----------
326

    
327
  @staticmethod
328
  def perspective_drbd_disconnect_net(params):
329
    """Disconnects the network connection of drbd disks.
330

    
331
    Note that this is only valid for drbd disks, so the members of the
332
    disk list must all be drbd devices.
333

    
334
    """
335
    nodes_ip, disks = params
336
    disks = [objects.Disk.FromDict(cf) for cf in disks]
337
    return backend.DrbdDisconnectNet(nodes_ip, disks)
338

    
339
  @staticmethod
340
  def perspective_drbd_attach_net(params):
341
    """Attaches the network connection of drbd disks.
342

    
343
    Note that this is only valid for drbd disks, so the members of the
344
    disk list must all be drbd devices.
345

    
346
    """
347
    nodes_ip, disks, instance_name, multimaster = params
348
    disks = [objects.Disk.FromDict(cf) for cf in disks]
349
    return backend.DrbdAttachNet(nodes_ip, disks,
350
                                     instance_name, multimaster)
351

    
352
  @staticmethod
353
  def perspective_drbd_wait_sync(params):
354
    """Wait until DRBD disks are synched.
355

    
356
    Note that this is only valid for drbd disks, so the members of the
357
    disk list must all be drbd devices.
358

    
359
    """
360
    nodes_ip, disks = params
361
    disks = [objects.Disk.FromDict(cf) for cf in disks]
362
    return backend.DrbdWaitSync(nodes_ip, disks)
363

    
364
  # export/import  --------------------------
365

    
366
  @staticmethod
367
  def perspective_finalize_export(params):
368
    """Expose the finalize export functionality.
369

    
370
    """
371
    instance = objects.Instance.FromDict(params[0])
372

    
373
    snap_disks = []
374
    for disk in params[1]:
375
      if isinstance(disk, bool):
376
        snap_disks.append(disk)
377
      else:
378
        snap_disks.append(objects.Disk.FromDict(disk))
379

    
380
    return backend.FinalizeExport(instance, snap_disks)
381

    
382
  @staticmethod
383
  def perspective_export_info(params):
384
    """Query information about an existing export on this node.
385

    
386
    The given path may not contain an export, in which case we return
387
    None.
388

    
389
    """
390
    path = params[0]
391
    return backend.ExportInfo(path)
392

    
393
  @staticmethod
394
  def perspective_export_list(params):
395
    """List the available exports on this node.
396

    
397
    Note that as opposed to export_info, which may query data about an
398
    export in any path, this only queries the standard Ganeti path
399
    (constants.EXPORT_DIR).
400

    
401
    """
402
    return backend.ListExports()
403

    
404
  @staticmethod
405
  def perspective_export_remove(params):
406
    """Remove an export.
407

    
408
    """
409
    export = params[0]
410
    return backend.RemoveExport(export)
411

    
412
  # volume  --------------------------
413

    
414
  @staticmethod
415
  def perspective_lv_list(params):
416
    """Query the list of logical volumes in a given volume group.
417

    
418
    """
419
    vgname = params[0]
420
    return backend.GetVolumeList(vgname)
421

    
422
  @staticmethod
423
  def perspective_vg_list(params):
424
    """Query the list of volume groups.
425

    
426
    """
427
    return backend.ListVolumeGroups()
428

    
429
  # Storage --------------------------
430

    
431
  @staticmethod
432
  def perspective_storage_list(params):
433
    """Get list of storage units.
434

    
435
    """
436
    (su_name, su_args, name, fields) = params
437
    return storage.GetStorage(su_name, *su_args).List(name, fields)
438

    
439
  @staticmethod
440
  def perspective_storage_modify(params):
441
    """Modify a storage unit.
442

    
443
    """
444
    (su_name, su_args, name, changes) = params
445
    return storage.GetStorage(su_name, *su_args).Modify(name, changes)
446

    
447
  @staticmethod
448
  def perspective_storage_execute(params):
449
    """Execute an operation on a storage unit.
450

    
451
    """
452
    (su_name, su_args, name, op) = params
453
    return storage.GetStorage(su_name, *su_args).Execute(name, op)
454

    
455
  # bridge  --------------------------
456

    
457
  @staticmethod
458
  def perspective_bridges_exist(params):
459
    """Check if all bridges given exist on this node.
460

    
461
    """
462
    bridges_list = params[0]
463
    return backend.BridgesExist(bridges_list)
464

    
465
  # instance  --------------------------
466

    
467
  @staticmethod
468
  def perspective_instance_os_add(params):
469
    """Install an OS on a given instance.
470

    
471
    """
472
    inst_s = params[0]
473
    inst = objects.Instance.FromDict(inst_s)
474
    reinstall = params[1]
475
    debug = params[2]
476
    return backend.InstanceOsAdd(inst, reinstall, debug)
477

    
478
  @staticmethod
479
  def perspective_instance_run_rename(params):
480
    """Runs the OS rename script for an instance.
481

    
482
    """
483
    inst_s, old_name, debug = params
484
    inst = objects.Instance.FromDict(inst_s)
485
    return backend.RunRenameInstance(inst, old_name, debug)
486

    
487
  @staticmethod
488
  def perspective_instance_shutdown(params):
489
    """Shutdown an instance.
490

    
491
    """
492
    instance = objects.Instance.FromDict(params[0])
493
    timeout = params[1]
494
    return backend.InstanceShutdown(instance, timeout)
495

    
496
  @staticmethod
497
  def perspective_instance_start(params):
498
    """Start an instance.
499

    
500
    """
501
    instance = objects.Instance.FromDict(params[0])
502
    return backend.StartInstance(instance)
503

    
504
  @staticmethod
505
  def perspective_migration_info(params):
506
    """Gather information about an instance to be migrated.
507

    
508
    """
509
    instance = objects.Instance.FromDict(params[0])
510
    return backend.MigrationInfo(instance)
511

    
512
  @staticmethod
513
  def perspective_accept_instance(params):
514
    """Prepare the node to accept an instance.
515

    
516
    """
517
    instance, info, target = params
518
    instance = objects.Instance.FromDict(instance)
519
    return backend.AcceptInstance(instance, info, target)
520

    
521
  @staticmethod
522
  def perspective_finalize_migration(params):
523
    """Finalize the instance migration.
524

    
525
    """
526
    instance, info, success = params
527
    instance = objects.Instance.FromDict(instance)
528
    return backend.FinalizeMigration(instance, info, success)
529

    
530
  @staticmethod
531
  def perspective_instance_migrate(params):
532
    """Migrates an instance.
533

    
534
    """
535
    instance, target, live = params
536
    instance = objects.Instance.FromDict(instance)
537
    return backend.MigrateInstance(instance, target, live)
538

    
539
  @staticmethod
540
  def perspective_instance_reboot(params):
541
    """Reboot an instance.
542

    
543
    """
544
    instance = objects.Instance.FromDict(params[0])
545
    reboot_type = params[1]
546
    shutdown_timeout = params[2]
547
    return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
548

    
549
  @staticmethod
550
  def perspective_instance_info(params):
551
    """Query instance information.
552

    
553
    """
554
    return backend.GetInstanceInfo(params[0], params[1])
555

    
556
  @staticmethod
557
  def perspective_instance_migratable(params):
558
    """Query whether the specified instance can be migrated.
559

    
560
    """
561
    instance = objects.Instance.FromDict(params[0])
562
    return backend.GetInstanceMigratable(instance)
563

    
564
  @staticmethod
565
  def perspective_all_instances_info(params):
566
    """Query information about all instances.
567

    
568
    """
569
    return backend.GetAllInstancesInfo(params[0])
570

    
571
  @staticmethod
572
  def perspective_instance_list(params):
573
    """Query the list of running instances.
574

    
575
    """
576
    return backend.GetInstanceList(params[0])
577

    
578
  # node --------------------------
579

    
580
  @staticmethod
581
  def perspective_node_tcp_ping(params):
582
    """Do a TcpPing on the remote node.
583

    
584
    """
585
    return utils.TcpPing(params[1], params[2], timeout=params[3],
586
                         live_port_needed=params[4], source=params[0])
587

    
588
  @staticmethod
589
  def perspective_node_has_ip_address(params):
590
    """Checks if a node has the given ip address.
591

    
592
    """
593
    return utils.OwnIpAddress(params[0])
594

    
595
  @staticmethod
596
  def perspective_node_info(params):
597
    """Query node information.
598

    
599
    """
600
    vgname, hypervisor_type = params
601
    return backend.GetNodeInfo(vgname, hypervisor_type)
602

    
603
  @staticmethod
604
  def perspective_node_add(params):
605
    """Complete the registration of this node in the cluster.
606

    
607
    """
608
    return backend.AddNode(params[0], params[1], params[2],
609
                           params[3], params[4], params[5])
610

    
611
  @staticmethod
612
  def perspective_node_verify(params):
613
    """Run a verify sequence on this node.
614

    
615
    """
616
    return backend.VerifyNode(params[0], params[1])
617

    
618
  @staticmethod
619
  def perspective_node_start_master(params):
620
    """Promote this node to master status.
621

    
622
    """
623
    return backend.StartMaster(params[0], params[1])
624

    
625
  @staticmethod
626
  def perspective_node_stop_master(params):
627
    """Demote this node from master status.
628

    
629
    """
630
    return backend.StopMaster(params[0])
631

    
632
  @staticmethod
633
  def perspective_node_leave_cluster(params):
634
    """Cleanup after leaving a cluster.
635

    
636
    """
637
    return backend.LeaveCluster(params[0])
638

    
639
  @staticmethod
640
  def perspective_node_volumes(params):
641
    """Query the list of all logical volume groups.
642

    
643
    """
644
    return backend.NodeVolumes()
645

    
646
  @staticmethod
647
  def perspective_node_demote_from_mc(params):
648
    """Demote a node from the master candidate role.
649

    
650
    """
651
    return backend.DemoteFromMC()
652

    
653

    
654
  @staticmethod
655
  def perspective_node_powercycle(params):
656
    """Tries to powercycle the nod.
657

    
658
    """
659
    hypervisor_type = params[0]
660
    return backend.PowercycleNode(hypervisor_type)
661

    
662

    
663
  # cluster --------------------------
664

    
665
  @staticmethod
666
  def perspective_version(params):
667
    """Query version information.
668

    
669
    """
670
    return constants.PROTOCOL_VERSION
671

    
672
  @staticmethod
673
  def perspective_upload_file(params):
674
    """Upload a file.
675

    
676
    Note that the backend implementation imposes strict rules on which
677
    files are accepted.
678

    
679
    """
680
    return backend.UploadFile(*params)
681

    
682
  @staticmethod
683
  def perspective_master_info(params):
684
    """Query master information.
685

    
686
    """
687
    return backend.GetMasterInfo()
688

    
689
  @staticmethod
690
  def perspective_write_ssconf_files(params):
691
    """Write ssconf files.
692

    
693
    """
694
    (values,) = params
695
    return backend.WriteSsconfFiles(values)
696

    
697
  # os -----------------------
698

    
699
  @staticmethod
700
  def perspective_os_diagnose(params):
701
    """Query detailed information about existing OSes.
702

    
703
    """
704
    return backend.DiagnoseOS()
705

    
706
  @staticmethod
707
  def perspective_os_get(params):
708
    """Query information about a given OS.
709

    
710
    """
711
    name = params[0]
712
    os_obj = backend.OSFromDisk(name)
713
    return os_obj.ToDict()
714

    
715
  # hooks -----------------------
716

    
717
  @staticmethod
718
  def perspective_hooks_runner(params):
719
    """Run hook scripts.
720

    
721
    """
722
    hpath, phase, env = params
723
    hr = backend.HooksRunner()
724
    return hr.RunHooks(hpath, phase, env)
725

    
726
  # iallocator -----------------
727

    
728
  @staticmethod
729
  def perspective_iallocator_runner(params):
730
    """Run an iallocator script.
731

    
732
    """
733
    name, idata = params
734
    iar = backend.IAllocatorRunner()
735
    return iar.Run(name, idata)
736

    
737
  # test -----------------------
738

    
739
  @staticmethod
740
  def perspective_test_delay(params):
741
    """Run test delay.
742

    
743
    """
744
    duration = params[0]
745
    status, rval = utils.TestDelay(duration)
746
    if not status:
747
      raise backend.RPCFail(rval)
748
    return rval
749

    
750
  # file storage ---------------
751

    
752
  @staticmethod
753
  def perspective_file_storage_dir_create(params):
754
    """Create the file storage directory.
755

    
756
    """
757
    file_storage_dir = params[0]
758
    return backend.CreateFileStorageDir(file_storage_dir)
759

    
760
  @staticmethod
761
  def perspective_file_storage_dir_remove(params):
762
    """Remove the file storage directory.
763

    
764
    """
765
    file_storage_dir = params[0]
766
    return backend.RemoveFileStorageDir(file_storage_dir)
767

    
768
  @staticmethod
769
  def perspective_file_storage_dir_rename(params):
770
    """Rename the file storage directory.
771

    
772
    """
773
    old_file_storage_dir = params[0]
774
    new_file_storage_dir = params[1]
775
    return backend.RenameFileStorageDir(old_file_storage_dir,
776
                                        new_file_storage_dir)
777

    
778
  # jobs ------------------------
779

    
780
  @staticmethod
781
  @_RequireJobQueueLock
782
  def perspective_jobqueue_update(params):
783
    """Update job queue.
784

    
785
    """
786
    (file_name, content) = params
787
    return backend.JobQueueUpdate(file_name, content)
788

    
789
  @staticmethod
790
  @_RequireJobQueueLock
791
  def perspective_jobqueue_purge(params):
792
    """Purge job queue.
793

    
794
    """
795
    return backend.JobQueuePurge()
796

    
797
  @staticmethod
798
  @_RequireJobQueueLock
799
  def perspective_jobqueue_rename(params):
800
    """Rename a job queue file.
801

    
802
    """
803
    # TODO: What if a file fails to rename?
804
    return [backend.JobQueueRename(old, new) for old, new in params]
805

    
806
  # hypervisor ---------------
807

    
808
  @staticmethod
809
  def perspective_hypervisor_validate_params(params):
810
    """Validate the hypervisor parameters.
811

    
812
    """
813
    (hvname, hvparams) = params
814
    return backend.ValidateHVParams(hvname, hvparams)
815

    
816
  # Crypto
817

    
818
  @staticmethod
819
  def perspective_x509_cert_create(params):
820
    """Creates a new X509 certificate for SSL/TLS.
821

    
822
    """
823
    (validity, ) = params
824
    return backend.CreateX509Certificate(validity)
825

    
826
  @staticmethod
827
  def perspective_x509_cert_remove(params):
828
    """Removes a X509 certificate.
829

    
830
    """
831
    (name, ) = params
832
    return backend.RemoveX509Certificate(name)
833

    
834
  # Import and export
835

    
836
  @staticmethod
837
  def perspective_import_start(params):
838
    """Starts an import daemon.
839

    
840
    """
841
    (opts_s, instance, dest, dest_args) = params
842

    
843
    opts = objects.ImportExportOptions.FromDict(opts_s)
844

    
845
    return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
846
                                           None, None,
847
                                           objects.Instance.FromDict(instance),
848
                                           dest,
849
                                           _DecodeImportExportIO(dest,
850
                                                                 dest_args))
851

    
852
  @staticmethod
853
  def perspective_export_start(params):
854
    """Starts an export daemon.
855

    
856
    """
857
    (opts_s, host, port, instance, source, source_args) = params
858

    
859
    opts = objects.ImportExportOptions.FromDict(opts_s)
860

    
861
    return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
862
                                           host, port,
863
                                           objects.Instance.FromDict(instance),
864
                                           source,
865
                                           _DecodeImportExportIO(source,
866
                                                                 source_args))
867

    
868
  @staticmethod
869
  def perspective_impexp_status(params):
870
    """Retrieves the status of an import or export daemon.
871

    
872
    """
873
    return backend.GetImportExportStatus(params[0])
874

    
875
  @staticmethod
876
  def perspective_impexp_abort(params):
877
    """Aborts an import or export.
878

    
879
    """
880
    return backend.AbortImportExport(params[0])
881

    
882
  @staticmethod
883
  def perspective_impexp_cleanup(params):
884
    """Cleans up after an import or export.
885

    
886
    """
887
    return backend.CleanupImportExport(params[0])
888

    
889

    
890
def CheckNoded(_, args):
891
  """Initial checks whether to run or exit with a failure.
892

    
893
  """
894
  if args: # noded doesn't take any arguments
895
    print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
896
                          sys.argv[0])
897
    sys.exit(constants.EXIT_FAILURE)
898

    
899

    
900
def ExecNoded(options, _):
901
  """Main node daemon function, executed with the PID file held.
902

    
903
  """
904
  if options.mlock:
905
    utils.Mlockall()
906
    request_executor_class = MlockallRequestExecutor
907
  else:
908
    request_executor_class = http.server.HttpServerRequestExecutor
909

    
910
  # Read SSL certificate
911
  if options.ssl:
912
    ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
913
                                    ssl_cert_path=options.ssl_cert)
914
  else:
915
    ssl_params = None
916

    
917
  err = _PrepareQueueLock()
918
  if err is not None:
919
    # this might be some kind of file-system/permission error; while
920
    # this breaks the job queue functionality, we shouldn't prevent
921
    # startup of the whole node daemon because of this
922
    logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
923

    
924
  mainloop = daemon.Mainloop()
925
  server = NodeHttpServer(mainloop, options.bind_address, options.port,
926
                          ssl_params=ssl_params, ssl_verify_peer=True,
927
                          request_executor_class=request_executor_class)
928
  server.Start()
929
  try:
930
    mainloop.Run()
931
  finally:
932
    server.Stop()
933

    
934

    
935
def main():
936
  """Main function for the node daemon.
937

    
938
  """
939
  parser = OptionParser(description="Ganeti node daemon",
940
                        usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
941
                        version="%%prog (ganeti) %s" %
942
                        constants.RELEASE_VERSION)
943
  parser.add_option("--no-mlock", dest="mlock",
944
                    help="Do not mlock the node memory in ram",
945
                    default=True, action="store_false")
946

    
947
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
948
  dirs.append((constants.LOG_OS_DIR, 0750))
949
  dirs.append((constants.LOCK_DIR, 1777))
950
  dirs.append((constants.CRYPTO_KEYS_DIR, constants.CRYPTO_KEYS_DIR_MODE))
951
  dirs.append((constants.IMPORT_EXPORT_DIR, constants.IMPORT_EXPORT_DIR_MODE))
952
  daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
953
                     default_ssl_cert=constants.NODED_CERT_FILE,
954
                     default_ssl_key=constants.NODED_CERT_FILE,
955
                     console_logging=True)
956

    
957

    
958
if __name__ == '__main__':
959
  main()