Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 38206f3c

History | View | Annotate | Download (17.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script to show add a new node to the cluster
23

24
"""
25

    
26
# pylint: disable-msg=C0103
27

    
28
import os
29
import socket
30
import httplib
31

    
32
import simplejson
33

    
34
from ganeti import logger
35
from ganeti import utils
36
from ganeti import objects
37
from ganeti import ssconf
38

    
39

    
40
class NodeController:
41
  """Node-handling class.
42

43
  For each node that we speak with, we create an instance of this
44
  class, so that we have a safe place to store the details of this
45
  individual call.
46

47
  """
48
  def __init__(self, parent, node):
49
    self.parent = parent
50
    self.node = node
51
    self.failed = False
52

    
53
    self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
54
    try:
55
      hc.connect()
56
      hc.putrequest('PUT', "/%s" % self.parent.procedure,
57
                    skip_accept_encoding=True)
58
      hc.putheader('Content-Length', str(len(parent.body)))
59
      hc.endheaders()
60
      hc.send(parent.body)
61
    except socket.error, err:
62
      logger.Error("Error connecting to %s: %s" % (node, str(err)))
63
      self.failed = True
64

    
65
  def get_response(self):
66
    """Try to process the response from the node.
67

68
    """
69
    if self.failed:
70
      # we already failed in connect
71
      return False
72
    resp = self.http_conn.getresponse()
73
    if resp.status != 200:
74
      return False
75
    try:
76
      length = int(resp.getheader('Content-Length', '0'))
77
    except ValueError:
78
      return False
79
    if not length:
80
      logger.Error("Zero-length reply from %s" % self.node)
81
      return False
82
    payload = resp.read(length)
83
    unload = simplejson.loads(payload)
84
    return unload
85

    
86

    
87
class Client:
88
  """RPC Client class.
89

90
  This class, given a (remote) method name, a list of parameters and a
91
  list of nodes, will contact (in parallel) all nodes, and return a
92
  dict of results (key: node name, value: result).
93

94
  One current bug is that generic failure is still signalled by
95
  'False' result, which is not good. This overloading of values can
96
  cause bugs.
97

98
  """
99
  result_set = False
100
  result = False
101
  allresult = []
102

    
103
  def __init__(self, procedure, args):
104
    ss = ssconf.SimpleStore()
105
    self.port = ss.GetNodeDaemonPort()
106
    self.nodepw = ss.GetNodeDaemonPassword()
107
    self.nc = {}
108
    self.results = {}
109
    self.procedure = procedure
110
    self.args = args
111
    self.body = simplejson.dumps(args)
112

    
113
  #--- generic connector -------------
114

    
115
  def connect_list(self, node_list):
116
    """Add a list of nodes to the target nodes.
117

118
    """
119
    for node in node_list:
120
      self.connect(node)
121

    
122
  def connect(self, connect_node):
123
    """Add a node to the target list.
124

125
    """
126
    self.nc[connect_node] = nc = NodeController(self, connect_node)
127

    
128
  def getresult(self):
129
    """Return the results of the call.
130

131
    """
132
    return self.results
133

    
134
  def run(self):
135
    """Wrapper over reactor.run().
136

137
    This function simply calls reactor.run() if we have any requests
138
    queued, otherwise it does nothing.
139

140
    """
141
    for node, nc in self.nc.items():
142
      self.results[node] = nc.get_response()
143

    
144

    
145
def call_volume_list(node_list, vg_name):
146
  """Gets the logical volumes present in a given volume group.
147

148
  This is a multi-node call.
149

150
  """
151
  c = Client("volume_list", [vg_name])
152
  c.connect_list(node_list)
153
  c.run()
154
  return c.getresult()
155

    
156

    
157
def call_vg_list(node_list):
158
  """Gets the volume group list.
159

160
  This is a multi-node call.
161

162
  """
163
  c = Client("vg_list", [])
164
  c.connect_list(node_list)
165
  c.run()
166
  return c.getresult()
167

    
168

    
169
def call_bridges_exist(node, bridges_list):
170
  """Checks if a node has all the bridges given.
171

172
  This method checks if all bridges given in the bridges_list are
173
  present on the remote node, so that an instance that uses interfaces
174
  on those bridges can be started.
175

176
  This is a single-node call.
177

178
  """
179
  c = Client("bridges_exist", [bridges_list])
180
  c.connect(node)
181
  c.run()
182
  return c.getresult().get(node, False)
183

    
184

    
185
def call_instance_start(node, instance, extra_args):
186
  """Starts an instance.
187

188
  This is a single-node call.
189

190
  """
191
  c = Client("instance_start", [instance.ToDict(), extra_args])
192
  c.connect(node)
193
  c.run()
194
  return c.getresult().get(node, False)
195

    
196

    
197
def call_instance_shutdown(node, instance):
198
  """Stops an instance.
199

200
  This is a single-node call.
201

202
  """
203
  c = Client("instance_shutdown", [instance.ToDict()])
204
  c.connect(node)
205
  c.run()
206
  return c.getresult().get(node, False)
207

    
208

    
209
def call_instance_migrate(node, instance, target, live):
210
  """Migrate an instance.
211

212
  This is a single-node call.
213

214
  """
215
  c = Client("instance_migrate", [instance.name, target, live])
216
  c.connect(node)
217
  c.run()
218
  return c.getresult().get(node, False)
219

    
220

    
221
def call_instance_reboot(node, instance, reboot_type, extra_args):
222
  """Reboots an instance.
223

224
  This is a single-node call.
225

226
  """
227
  c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
228
  c.connect(node)
229
  c.run()
230
  return c.getresult().get(node, False)
231

    
232

    
233
def call_instance_os_add(node, inst, osdev, swapdev):
234
  """Installs an OS on the given instance.
235

236
  This is a single-node call.
237

238
  """
239
  params = [inst.ToDict(), osdev, swapdev]
240
  c = Client("instance_os_add", params)
241
  c.connect(node)
242
  c.run()
243
  return c.getresult().get(node, False)
244

    
245

    
246
def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
247
  """Run the OS rename script for an instance.
248

249
  This is a single-node call.
250

251
  """
252
  params = [inst.ToDict(), old_name, osdev, swapdev]
253
  c = Client("instance_run_rename", params)
254
  c.connect(node)
255
  c.run()
256
  return c.getresult().get(node, False)
257

    
258

    
259
def call_instance_info(node, instance):
260
  """Returns information about a single instance.
261

262
  This is a single-node call.
263

264
  """
265
  c = Client("instance_info", [instance])
266
  c.connect(node)
267
  c.run()
268
  return c.getresult().get(node, False)
269

    
270

    
271
def call_all_instances_info(node_list):
272
  """Returns information about all instances on a given node.
273

274
  This is a single-node call.
275

276
  """
277
  c = Client("all_instances_info", [])
278
  c.connect_list(node_list)
279
  c.run()
280
  return c.getresult()
281

    
282

    
283
def call_instance_list(node_list):
284
  """Returns the list of running instances on a given node.
285

286
  This is a single-node call.
287

288
  """
289
  c = Client("instance_list", [])
290
  c.connect_list(node_list)
291
  c.run()
292
  return c.getresult()
293

    
294

    
295
def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
296
  """Do a TcpPing on the remote node
297

298
  This is a single-node call.
299
  """
300
  c = Client("node_tcp_ping", [source, target, port, timeout,
301
                               live_port_needed])
302
  c.connect(node)
303
  c.run()
304
  return c.getresult().get(node, False)
305

    
306

    
307
def call_node_info(node_list, vg_name):
308
  """Return node information.
309

310
  This will return memory information and volume group size and free
311
  space.
312

313
  This is a multi-node call.
314

315
  """
316
  c = Client("node_info", [vg_name])
317
  c.connect_list(node_list)
318
  c.run()
319
  retux = c.getresult()
320

    
321
  for node_name in retux:
322
    ret = retux.get(node_name, False)
323
    if type(ret) != dict:
324
      logger.Error("could not connect to node %s" % (node_name))
325
      ret = {}
326

    
327
    utils.CheckDict(ret,
328
                    { 'memory_total' : '-',
329
                      'memory_dom0' : '-',
330
                      'memory_free' : '-',
331
                      'vg_size' : 'node_unreachable',
332
                      'vg_free' : '-' },
333
                    "call_node_info",
334
                    )
335
  return retux
336

    
337

    
338
def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
339
  """Add a node to the cluster.
340

341
  This is a single-node call.
342

343
  """
344
  params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
345
  c = Client("node_add", params)
346
  c.connect(node)
347
  c.run()
348
  return c.getresult().get(node, False)
349

    
350

    
351
def call_node_verify(node_list, checkdict):
352
  """Request verification of given parameters.
353

354
  This is a multi-node call.
355

356
  """
357
  c = Client("node_verify", [checkdict])
358
  c.connect_list(node_list)
359
  c.run()
360
  return c.getresult()
361

    
362

    
363
def call_node_start_master(node, start_daemons):
364
  """Tells a node to activate itself as a master.
365

366
  This is a single-node call.
367

368
  """
369
  c = Client("node_start_master", [start_daemons])
370
  c.connect(node)
371
  c.run()
372
  return c.getresult().get(node, False)
373

    
374

    
375
def call_node_stop_master(node, stop_daemons):
376
  """Tells a node to demote itself from master status.
377

378
  This is a single-node call.
379

380
  """
381
  c = Client("node_stop_master", [stop_daemons])
382
  c.connect(node)
383
  c.run()
384
  return c.getresult().get(node, False)
385

    
386

    
387
def call_version(node_list):
388
  """Query node version.
389

390
  This is a multi-node call.
391

392
  """
393
  c = Client("version", [])
394
  c.connect_list(node_list)
395
  c.run()
396
  return c.getresult()
397

    
398

    
399
def call_blockdev_create(node, bdev, size, owner, on_primary, info):
400
  """Request creation of a given block device.
401

402
  This is a single-node call.
403

404
  """
405
  params = [bdev.ToDict(), size, owner, on_primary, info]
406
  c = Client("blockdev_create", params)
407
  c.connect(node)
408
  c.run()
409
  return c.getresult().get(node, False)
410

    
411

    
412
def call_blockdev_remove(node, bdev):
413
  """Request removal of a given block device.
414

415
  This is a single-node call.
416

417
  """
418
  c = Client("blockdev_remove", [bdev.ToDict()])
419
  c.connect(node)
420
  c.run()
421
  return c.getresult().get(node, False)
422

    
423

    
424
def call_blockdev_rename(node, devlist):
425
  """Request rename of the given block devices.
426

427
  This is a single-node call.
428

429
  """
430
  params = [(d.ToDict(), uid) for d, uid in devlist]
431
  c = Client("blockdev_rename", params)
432
  c.connect(node)
433
  c.run()
434
  return c.getresult().get(node, False)
435

    
436

    
437
def call_blockdev_assemble(node, disk, owner, on_primary):
438
  """Request assembling of a given block device.
439

440
  This is a single-node call.
441

442
  """
443
  params = [disk.ToDict(), owner, on_primary]
444
  c = Client("blockdev_assemble", params)
445
  c.connect(node)
446
  c.run()
447
  return c.getresult().get(node, False)
448

    
449

    
450
def call_blockdev_shutdown(node, disk):
451
  """Request shutdown of a given block device.
452

453
  This is a single-node call.
454

455
  """
456
  c = Client("blockdev_shutdown", [disk.ToDict()])
457
  c.connect(node)
458
  c.run()
459
  return c.getresult().get(node, False)
460

    
461

    
462
def call_blockdev_addchildren(node, bdev, ndevs):
463
  """Request adding a list of children to a (mirroring) device.
464

465
  This is a single-node call.
466

467
  """
468
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
469
  c = Client("blockdev_addchildren", params)
470
  c.connect(node)
471
  c.run()
472
  return c.getresult().get(node, False)
473

    
474

    
475
def call_blockdev_removechildren(node, bdev, ndevs):
476
  """Request removing a list of children from a (mirroring) device.
477

478
  This is a single-node call.
479

480
  """
481
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
482
  c = Client("blockdev_removechildren", params)
483
  c.connect(node)
484
  c.run()
485
  return c.getresult().get(node, False)
486

    
487

    
488
def call_blockdev_getmirrorstatus(node, disks):
489
  """Request status of a (mirroring) device.
490

491
  This is a single-node call.
492

493
  """
494
  params = [dsk.ToDict() for dsk in disks]
495
  c = Client("blockdev_getmirrorstatus", params)
496
  c.connect(node)
497
  c.run()
498
  return c.getresult().get(node, False)
499

    
500

    
501
def call_blockdev_find(node, disk):
502
  """Request identification of a given block device.
503

504
  This is a single-node call.
505

506
  """
507
  c = Client("blockdev_find", [disk.ToDict()])
508
  c.connect(node)
509
  c.run()
510
  return c.getresult().get(node, False)
511

    
512

    
513
def call_blockdev_close(node, disks):
514
  """Closes the given block devices.
515

516
  This is a single-node call.
517

518
  """
519
  params = [cf.ToDict() for cf in disks]
520
  c = Client("blockdev_close", params)
521
  c.connect(node)
522
  c.run()
523
  return c.getresult().get(node, False)
524

    
525

    
526
def call_upload_file(node_list, file_name):
527
  """Upload a file.
528

529
  The node will refuse the operation in case the file is not on the
530
  approved file list.
531

532
  This is a multi-node call.
533

534
  """
535
  fh = file(file_name)
536
  try:
537
    data = fh.read()
538
  finally:
539
    fh.close()
540
  st = os.stat(file_name)
541
  params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
542
            st.st_atime, st.st_mtime]
543
  c = Client("upload_file", params)
544
  c.connect_list(node_list)
545
  c.run()
546
  return c.getresult()
547

    
548

    
549
def call_os_diagnose(node_list):
550
  """Request a diagnose of OS definitions.
551

552
  This is a multi-node call.
553

554
  """
555
  c = Client("os_diagnose", [])
556
  c.connect_list(node_list)
557
  c.run()
558
  result = c.getresult()
559
  new_result = {}
560
  for node_name in result:
561
    if result[node_name]:
562
      nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
563
    else:
564
      nr = []
565
    new_result[node_name] = nr
566
  return new_result
567

    
568

    
569
def call_os_get(node, name):
570
  """Returns an OS definition.
571

572
  This is a single-node call.
573

574
  """
575
  c = Client("os_get", [name])
576
  c.connect(node)
577
  c.run()
578
  result = c.getresult().get(node, False)
579
  if isinstance(result, dict):
580
    return objects.OS.FromDict(result)
581
  else:
582
    return result
583

    
584

    
585
def call_hooks_runner(node_list, hpath, phase, env):
586
  """Call the hooks runner.
587

588
  Args:
589
    - op: the OpCode instance
590
    - env: a dictionary with the environment
591

592
  This is a multi-node call.
593

594
  """
595
  params = [hpath, phase, env]
596
  c = Client("hooks_runner", params)
597
  c.connect_list(node_list)
598
  c.run()
599
  result = c.getresult()
600
  return result
601

    
602

    
603
def call_iallocator_runner(node, name, idata):
604
  """Call an iallocator on a remote node
605

606
  Args:
607
    - name: the iallocator name
608
    - input: the json-encoded input string
609

610
  This is a single-node call.
611

612
  """
613
  params = [name, idata]
614
  c = Client("iallocator_runner", params)
615
  c.connect(node)
616
  c.run()
617
  result = c.getresult().get(node, False)
618
  return result
619

    
620

    
621
def call_blockdev_grow(node, cf_bdev, amount):
622
  """Request a snapshot of the given block device.
623

624
  This is a single-node call.
625

626
  """
627
  c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
628
  c.connect(node)
629
  c.run()
630
  return c.getresult().get(node, False)
631

    
632

    
633
def call_blockdev_snapshot(node, cf_bdev):
634
  """Request a snapshot of the given block device.
635

636
  This is a single-node call.
637

638
  """
639
  c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
640
  c.connect(node)
641
  c.run()
642
  return c.getresult().get(node, False)
643

    
644

    
645
def call_snapshot_export(node, snap_bdev, dest_node, instance):
646
  """Request the export of a given snapshot.
647

648
  This is a single-node call.
649

650
  """
651
  params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
652
  c = Client("snapshot_export", params)
653
  c.connect(node)
654
  c.run()
655
  return c.getresult().get(node, False)
656

    
657

    
658
def call_finalize_export(node, instance, snap_disks):
659
  """Request the completion of an export operation.
660

661
  This writes the export config file, etc.
662

663
  This is a single-node call.
664

665
  """
666
  flat_disks = []
667
  for disk in snap_disks:
668
    flat_disks.append(disk.ToDict())
669
  params = [instance.ToDict(), flat_disks]
670
  c = Client("finalize_export", params)
671
  c.connect(node)
672
  c.run()
673
  return c.getresult().get(node, False)
674

    
675

    
676
def call_export_info(node, path):
677
  """Queries the export information in a given path.
678

679
  This is a single-node call.
680

681
  """
682
  c = Client("export_info", [path])
683
  c.connect(node)
684
  c.run()
685
  result = c.getresult().get(node, False)
686
  if not result:
687
    return result
688
  return objects.SerializableConfigParser.Loads(str(result))
689

    
690

    
691
def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
692
  """Request the import of a backup into an instance.
693

694
  This is a single-node call.
695

696
  """
697
  params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
698
  c = Client("instance_os_import", params)
699
  c.connect(node)
700
  c.run()
701
  return c.getresult().get(node, False)
702

    
703

    
704
def call_export_list(node_list):
705
  """Gets the stored exports list.
706

707
  This is a multi-node call.
708

709
  """
710
  c = Client("export_list", [])
711
  c.connect_list(node_list)
712
  c.run()
713
  result = c.getresult()
714
  return result
715

    
716

    
717
def call_export_remove(node, export):
718
  """Requests removal of a given export.
719

720
  This is a single-node call.
721

722
  """
723
  c = Client("export_remove", [export])
724
  c.connect(node)
725
  c.run()
726
  return c.getresult().get(node, False)
727

    
728

    
729
def call_node_leave_cluster(node):
730
  """Requests a node to clean the cluster information it has.
731

732
  This will remove the configuration information from the ganeti data
733
  dir.
734

735
  This is a single-node call.
736

737
  """
738
  c = Client("node_leave_cluster", [])
739
  c.connect(node)
740
  c.run()
741
  return c.getresult().get(node, False)
742

    
743

    
744
def call_node_volumes(node_list):
745
  """Gets all volumes on node(s).
746

747
  This is a multi-node call.
748

749
  """
750
  c = Client("node_volumes", [])
751
  c.connect_list(node_list)
752
  c.run()
753
  return c.getresult()
754

    
755

    
756
def call_test_delay(node_list, duration):
757
  """Sleep for a fixed time on given node(s).
758

759
  This is a multi-node call.
760

761
  """
762
  c = Client("test_delay", [duration])
763
  c.connect_list(node_list)
764
  c.run()
765
  return c.getresult()
766

    
767

    
768
def call_file_storage_dir_create(node, file_storage_dir):
769
  """Create the given file storage directory.
770

771
  This is a single-node call.
772

773
  """
774
  c = Client("file_storage_dir_create", [file_storage_dir])
775
  c.connect(node)
776
  c.run()
777
  return c.getresult().get(node, False)
778

    
779

    
780
def call_file_storage_dir_remove(node, file_storage_dir):
781
  """Remove the given file storage directory.
782

783
  This is a single-node call.
784

785
  """
786
  c = Client("file_storage_dir_remove", [file_storage_dir])
787
  c.connect(node)
788
  c.run()
789
  return c.getresult().get(node, False)
790

    
791

    
792
def call_file_storage_dir_rename(node, old_file_storage_dir,
793
                                 new_file_storage_dir):
794
  """Rename file storage directory.
795

796
  This is a single-node call.
797

798
  """
799
  c = Client("file_storage_dir_rename",
800
             [old_file_storage_dir, new_file_storage_dir])
801
  c.connect(node)
802
  c.run()
803
  return c.getresult().get(node, False)