Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 1abbbbe2

History | View | Annotate | Download (16.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import resource
30
import traceback
31
import BaseHTTPServer
32
import simplejson
33
import errno
34

    
35
from optparse import OptionParser
36

    
37

    
38
from ganeti import backend
39
from ganeti import logger
40
from ganeti import constants
41
from ganeti import objects
42
from ganeti import errors
43
from ganeti import ssconf
44
from ganeti import utils
45

    
46

    
47
class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler):
48
  """The server implementation.
49

    
50
  This class holds all methods exposed over the RPC interface.
51

    
52
  """
53
  def do_PUT(self):
54
    """Handle a post request.
55

    
56
    """
57
    path = self.path
58
    if path.startswith("/"):
59
      path = path[1:]
60
    mname = "perspective_%s" % path
61
    if not hasattr(self, mname):
62
      self.send_error(404)
63
      return False
64

    
65
    method = getattr(self, mname)
66
    try:
67
      body_length = int(self.headers.get('Content-Length', '0'))
68
    except ValueError:
69
      self.send_error(400, 'No Content-Length header or invalid format')
70
      return False
71

    
72
    try:
73
      body = self.rfile.read(body_length)
74
    except socket.error, err:
75
      logger.Error("Socket error while reading: %s" % str(err))
76
      return
77
    try:
78
      params = simplejson.loads(body)
79
      result = method(params)
80
      payload = simplejson.dumps(result)
81
    except Exception, err:
82
      self.send_error(500, "Error: %s" % str(err))
83
      return False
84
    self.send_response(200)
85
    self.send_header('Content-Length', str(len(payload)))
86
    self.end_headers()
87
    self.wfile.write(payload)
88
    return True
89

    
90
  def log_message(self, format, *args):
91
    """Log a request to the log.
92

    
93
    This is the same as the parent, we just log somewhere else.
94

    
95
    """
96
    msg = ("%s - - [%s] %s\n" %
97
           (self.address_string(),
98
            self.log_date_time_string(),
99
            format % args))
100
    logger.Debug(msg)
101

    
102
  # the new block devices  --------------------------
103

    
104
  @staticmethod
105
  def perspective_blockdev_create(params):
106
    """Create a block device.
107

    
108
    """
109
    bdev_s, size, owner, on_primary, info = params
110
    bdev = objects.Disk.FromDict(bdev_s)
111
    if bdev is None:
112
      raise ValueError("can't unserialize data!")
113
    return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
114

    
115
  @staticmethod
116
  def perspective_blockdev_remove(params):
117
    """Remove a block device.
118

    
119
    """
120
    bdev_s = params[0]
121
    bdev = objects.Disk.FromDict(bdev_s)
122
    return backend.RemoveBlockDevice(bdev)
123

    
124
  @staticmethod
125
  def perspective_blockdev_rename(params):
126
    """Remove a block device.
127

    
128
    """
129
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
130
    return backend.RenameBlockDevices(devlist)
131

    
132
  @staticmethod
133
  def perspective_blockdev_assemble(params):
134
    """Assemble a block device.
135

    
136
    """
137
    bdev_s, owner, on_primary = params
138
    bdev = objects.Disk.FromDict(bdev_s)
139
    if bdev is None:
140
      raise ValueError("can't unserialize data!")
141
    return backend.AssembleBlockDevice(bdev, owner, on_primary)
142

    
143
  @staticmethod
144
  def perspective_blockdev_shutdown(params):
145
    """Shutdown a block device.
146

    
147
    """
148
    bdev_s = params[0]
149
    bdev = objects.Disk.FromDict(bdev_s)
150
    if bdev is None:
151
      raise ValueError("can't unserialize data!")
152
    return backend.ShutdownBlockDevice(bdev)
153

    
154
  @staticmethod
155
  def perspective_blockdev_addchildren(params):
156
    """Add a child to a mirror device.
157

    
158
    Note: this is only valid for mirror devices. It's the caller's duty
159
    to send a correct disk, otherwise we raise an error.
160

    
161
    """
162
    bdev_s, ndev_s = params
163
    bdev = objects.Disk.FromDict(bdev_s)
164
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
165
    if bdev is None or ndevs.count(None) > 0:
166
      raise ValueError("can't unserialize data!")
167
    return backend.MirrorAddChildren(bdev, ndevs)
168

    
169
  @staticmethod
170
  def perspective_blockdev_removechildren(params):
171
    """Remove a child from a mirror device.
172

    
173
    This is only valid for mirror devices, of course. It's the callers
174
    duty to send a correct disk, otherwise we raise an error.
175

    
176
    """
177
    bdev_s, ndev_s = params
178
    bdev = objects.Disk.FromDict(bdev_s)
179
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
180
    if bdev is None or ndevs.count(None) > 0:
181
      raise ValueError("can't unserialize data!")
182
    return backend.MirrorRemoveChildren(bdev, ndevs)
183

    
184
  @staticmethod
185
  def perspective_blockdev_getmirrorstatus(params):
186
    """Return the mirror status for a list of disks.
187

    
188
    """
189
    disks = [objects.Disk.FromDict(dsk_s)
190
            for dsk_s in params]
191
    return backend.GetMirrorStatus(disks)
192

    
193
  @staticmethod
194
  def perspective_blockdev_find(params):
195
    """Expose the FindBlockDevice functionality for a disk.
196

    
197
    This will try to find but not activate a disk.
198

    
199
    """
200
    disk = objects.Disk.FromDict(params[0])
201
    return backend.FindBlockDevice(disk)
202

    
203
  @staticmethod
204
  def perspective_blockdev_snapshot(params):
205
    """Create a snapshot device.
206

    
207
    Note that this is only valid for LVM disks, if we get passed
208
    something else we raise an exception. The snapshot device can be
209
    remove by calling the generic block device remove call.
210

    
211
    """
212
    cfbd = objects.Disk.FromDict(params[0])
213
    return backend.SnapshotBlockDevice(cfbd)
214

    
215
  # export/import  --------------------------
216

    
217
  @staticmethod
218
  def perspective_snapshot_export(params):
219
    """Export a given snapshot.
220

    
221
    """
222
    disk = objects.Disk.FromDict(params[0])
223
    dest_node = params[1]
224
    instance = objects.Instance.FromDict(params[2])
225
    return backend.ExportSnapshot(disk, dest_node, instance)
226

    
227
  @staticmethod
228
  def perspective_finalize_export(params):
229
    """Expose the finalize export functionality.
230

    
231
    """
232
    instance = objects.Instance.FromDict(params[0])
233
    snap_disks = [objects.Disk.FromDict(str_data)
234
                  for str_data in params[1]]
235
    return backend.FinalizeExport(instance, snap_disks)
236

    
237
  @staticmethod
238
  def perspective_export_info(params):
239
    """Query information about an existing export on this node.
240

    
241
    The given path may not contain an export, in which case we return
242
    None.
243

    
244
    """
245
    path = params[0]
246
    einfo = backend.ExportInfo(path)
247
    if einfo is None:
248
      return einfo
249
    return einfo.Dumps()
250

    
251
  @staticmethod
252
  def perspective_export_list(params):
253
    """List the available exports on this node.
254

    
255
    Note that as opposed to export_info, which may query data about an
256
    export in any path, this only queries the standard Ganeti path
257
    (constants.EXPORT_DIR).
258

    
259
    """
260
    return backend.ListExports()
261

    
262
  @staticmethod
263
  def perspective_export_remove(params):
264
    """Remove an export.
265

    
266
    """
267
    export = params[0]
268
    return backend.RemoveExport(export)
269

    
270
  # volume  --------------------------
271

    
272
  @staticmethod
273
  def perspective_volume_list(params):
274
    """Query the list of logical volumes in a given volume group.
275

    
276
    """
277
    vgname = params[0]
278
    return backend.GetVolumeList(vgname)
279

    
280
  @staticmethod
281
  def perspective_vg_list(params):
282
    """Query the list of volume groups.
283

    
284
    """
285
    return backend.ListVolumeGroups()
286

    
287
  # bridge  --------------------------
288

    
289
  @staticmethod
290
  def perspective_bridges_exist(params):
291
    """Check if all bridges given exist on this node.
292

    
293
    """
294
    bridges_list = params[0]
295
    return backend.BridgesExist(bridges_list)
296

    
297
  # instance  --------------------------
298

    
299
  @staticmethod
300
  def perspective_instance_os_add(params):
301
    """Install an OS on a given instance.
302

    
303
    """
304
    inst_s, os_disk, swap_disk = params
305
    inst = objects.Instance.FromDict(inst_s)
306
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
307

    
308
  @staticmethod
309
  def perspective_instance_run_rename(params):
310
    """Runs the OS rename script for an instance.
311

    
312
    """
313
    inst_s, old_name, os_disk, swap_disk = params
314
    inst = objects.Instance.FromDict(inst_s)
315
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
316

    
317
  @staticmethod
318
  def perspective_instance_os_import(params):
319
    """Run the import function of an OS onto a given instance.
320

    
321
    """
322
    inst_s, os_disk, swap_disk, src_node, src_image = params
323
    inst = objects.Instance.FromDict(inst_s)
324
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
325
                                        src_node, src_image)
326

    
327
  @staticmethod
328
  def perspective_instance_shutdown(params):
329
    """Shutdown an instance.
330

    
331
    """
332
    instance = objects.Instance.FromDict(params[0])
333
    return backend.ShutdownInstance(instance)
334

    
335
  @staticmethod
336
  def perspective_instance_start(params):
337
    """Start an instance.
338

    
339
    """
340
    instance = objects.Instance.FromDict(params[0])
341
    extra_args = params[1]
342
    return backend.StartInstance(instance, extra_args)
343

    
344
  @staticmethod
345
  def perspective_instance_reboot(params):
346
    """Reboot an instance.
347

    
348
    """
349
    instance = objects.Instance.FromDict(params[0])
350
    reboot_type = params[1]
351
    extra_args = params[2]
352
    return backend.RebootInstance(instance, reboot_type, extra_args)
353

    
354
  @staticmethod
355
  def perspective_instance_info(params):
356
    """Query instance information.
357

    
358
    """
359
    return backend.GetInstanceInfo(params[0])
360

    
361
  @staticmethod
362
  def perspective_all_instances_info(params):
363
    """Query information about all instances.
364

    
365
    """
366
    return backend.GetAllInstancesInfo()
367

    
368
  @staticmethod
369
  def perspective_instance_list(params):
370
    """Query the list of running instances.
371

    
372
    """
373
    return backend.GetInstanceList()
374

    
375
  # node --------------------------
376

    
377
  @staticmethod
378
  def perspective_node_tcp_ping(params):
379
    """Do a TcpPing on the remote node.
380

    
381
    """
382
    return utils.TcpPing(params[1], params[2], timeout=params[3],
383
                         live_port_needed=params[4], source=params[0])
384

    
385
  @staticmethod
386
  def perspective_node_info(params):
387
    """Query node information.
388

    
389
    """
390
    vgname = params[0]
391
    return backend.GetNodeInfo(vgname)
392

    
393
  @staticmethod
394
  def perspective_node_add(params):
395
    """Complete the registration of this node in the cluster.
396

    
397
    """
398
    return backend.AddNode(params[0], params[1], params[2],
399
                           params[3], params[4], params[5])
400

    
401
  @staticmethod
402
  def perspective_node_verify(params):
403
    """Run a verify sequence on this node.
404

    
405
    """
406
    return backend.VerifyNode(params[0])
407

    
408
  @staticmethod
409
  def perspective_node_start_master(params):
410
    """Promote this node to master status.
411

    
412
    """
413
    return backend.StartMaster()
414

    
415
  @staticmethod
416
  def perspective_node_stop_master(params):
417
    """Demote this node from master status.
418

    
419
    """
420
    return backend.StopMaster()
421

    
422
  @staticmethod
423
  def perspective_node_leave_cluster(params):
424
    """Cleanup after leaving a cluster.
425

    
426
    """
427
    return backend.LeaveCluster()
428

    
429
  @staticmethod
430
  def perspective_node_volumes(params):
431
    """Query the list of all logical volume groups.
432

    
433
    """
434
    return backend.NodeVolumes()
435

    
436
  # cluster --------------------------
437

    
438
  @staticmethod
439
  def perspective_version(params):
440
    """Query version information.
441

    
442
    """
443
    return constants.PROTOCOL_VERSION
444

    
445
  @staticmethod
446
  def perspective_upload_file(params):
447
    """Upload a file.
448

    
449
    Note that the backend implementation imposes strict rules on which
450
    files are accepted.
451

    
452
    """
453
    return backend.UploadFile(*params)
454

    
455

    
456
  # os -----------------------
457

    
458
  @staticmethod
459
  def perspective_os_diagnose(params):
460
    """Query detailed information about existing OSes.
461

    
462
    """
463
    return [os.ToDict() for os in backend.DiagnoseOS()]
464

    
465
  @staticmethod
466
  def perspective_os_get(params):
467
    """Query information about a given OS.
468

    
469
    """
470
    name = params[0]
471
    try:
472
      os_obj = backend.OSFromDisk(name)
473
    except errors.InvalidOS, err:
474
      os_obj = objects.OS.FromInvalidOS(err)
475
    return os_obj.ToDict()
476

    
477
  # hooks -----------------------
478

    
479
  @staticmethod
480
  def perspective_hooks_runner(params):
481
    """Run hook scripts.
482

    
483
    """
484
    hpath, phase, env = params
485
    hr = backend.HooksRunner()
486
    return hr.RunHooks(hpath, phase, env)
487

    
488
  # test -----------------------
489

    
490
  @staticmethod
491
  def perspective_test_delay(params):
492
    """Run test delay.
493

    
494
    """
495
    duration = params[0]
496
    return utils.TestDelay(duration)
497

    
498

    
499
def ParseOptions():
500
  """Parse the command line options.
501

    
502
  Returns:
503
    (options, args) as from OptionParser.parse_args()
504

    
505
  """
506
  parser = OptionParser(description="Ganeti node daemon",
507
                        usage="%prog [-f] [-d]",
508
                        version="%%prog (ganeti) %s" %
509
                        constants.RELEASE_VERSION)
510

    
511
  parser.add_option("-f", "--foreground", dest="fork",
512
                    help="Don't detach from the current terminal",
513
                    default=True, action="store_false")
514
  parser.add_option("-d", "--debug", dest="debug",
515
                    help="Enable some debug messages",
516
                    default=False, action="store_true")
517
  options, args = parser.parse_args()
518
  return options, args
519

    
520

    
521
def main():
522
  """Main function for the node daemon.
523

    
524
  """
525
  options, args = ParseOptions()
526
  utils.debug = options.debug
527
  for fname in (constants.SSL_CERT_FILE,):
528
    if not os.path.isfile(fname):
529
      print "config %s not there, will not run." % fname
530
      sys.exit(5)
531

    
532
  try:
533
    ss = ssconf.SimpleStore()
534
    port = ss.GetNodeDaemonPort()
535
    pwdata = ss.GetNodeDaemonPassword()
536
  except errors.ConfigurationError, err:
537
    print "Cluster configuration incomplete: '%s'" % str(err)
538
    sys.exit(5)
539

    
540
  # create /var/run/ganeti if not existing, in order to take care of
541
  # tmpfs /var/run
542
  if not os.path.exists(constants.BDEV_CACHE_DIR):
543
    try:
544
      os.mkdir(constants.BDEV_CACHE_DIR, 0755)
545
    except EnvironmentError, err:
546
      if err.errno != errno.EEXIST:
547
        print ("Node setup wrong, cannot create directory %s: %s" %
548
               (constants.BDEV_CACHE_DIR, err))
549
        sys.exit(5)
550
  if not os.path.isdir(constants.BDEV_CACHE_DIR):
551
    print ("Node setup wrong, %s is not a directory" %
552
           constants.BDEV_CACHE_DIR)
553
    sys.exit(5)
554

    
555
  # become a daemon
556
  if options.fork:
557
    createDaemon()
558

    
559
  logger.SetupLogging(twisted_workaround=True, debug=options.debug,
560
                      program="ganeti-noded")
561

    
562
  httpd = BaseHTTPServer.HTTPServer(('', port), ServerObject)
563
  httpd.serve_forever()
564

    
565

    
566
def createDaemon():
567
  """Detach a process from the controlling terminal and run it in the
568
  background as a daemon.
569

    
570
  """
571
  UMASK = 077
572
  WORKDIR = "/"
573
  # Default maximum for the number of available file descriptors.
574
  if 'SC_OPEN_MAX' in os.sysconf_names:
575
    try:
576
      MAXFD = os.sysconf('SC_OPEN_MAX')
577
      if MAXFD < 0:
578
        MAXFD = 1024
579
    except OSError:
580
      MAXFD = 1024
581
  else:
582
    MAXFD = 1024
583
  # The standard I/O file descriptors are redirected to /dev/null by default.
584
  #REDIRECT_TO = getattr(os, "devnull", "/dev/null")
585
  REDIRECT_TO = constants.LOG_NODESERVER
586
  try:
587
    pid = os.fork()
588
  except OSError, e:
589
    raise Exception("%s [%d]" % (e.strerror, e.errno))
590
  if (pid == 0):  # The first child.
591
    os.setsid()
592
    try:
593
      pid = os.fork() # Fork a second child.
594
    except OSError, e:
595
      raise Exception("%s [%d]" % (e.strerror, e.errno))
596
    if (pid == 0):  # The second child.
597
      os.chdir(WORKDIR)
598
      os.umask(UMASK)
599
    else:
600
      # exit() or _exit()?  See below.
601
      os._exit(0) # Exit parent (the first child) of the second child.
602
  else:
603
    os._exit(0) # Exit parent of the first child.
604
  maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
605
  if (maxfd == resource.RLIM_INFINITY):
606
    maxfd = MAXFD
607

    
608
  # Iterate through and close all file descriptors.
609
  for fd in range(0, maxfd):
610
    try:
611
      os.close(fd)
612
    except OSError: # ERROR, fd wasn't open to begin with (ignored)
613
      pass
614
  os.open(REDIRECT_TO, os.O_RDWR|os.O_CREAT|os.O_APPEND, 0600)
615
  # Duplicate standard input to standard output and standard error.
616
  os.dup2(0, 1)     # standard output (1)
617
  os.dup2(0, 2)     # standard error (2)
618
  return(0)
619

    
620

    
621
if __name__ == '__main__':
622
  main()