Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 6b90c22e

History | View | Annotate | Download (17.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script to show add a new node to the cluster
23

24
"""
25

    
26
# pylint: disable-msg=C0103
27

    
28
import os
29
import socket
30
import httplib
31

    
32
import simplejson
33

    
34
from ganeti import logger
35
from ganeti import utils
36
from ganeti import errors
37
from ganeti import constants
38
from ganeti import objects
39
from ganeti import ssconf
40

    
41

    
42
class NodeController:
43
  """Node-handling class.
44

45
  For each node that we speak with, we create an instance of this
46
  class, so that we have a safe place to store the details of this
47
  individual call.
48

49
  """
50
  def __init__(self, parent, node):
51
    self.parent = parent
52
    self.node = node
53
    self.failed = False
54

    
55
    self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
56
    try:
57
      hc.connect()
58
      hc.putrequest('PUT', "/%s" % self.parent.procedure,
59
                    skip_accept_encoding=True)
60
      hc.putheader('Content-Length', str(len(parent.body)))
61
      hc.endheaders()
62
      hc.send(parent.body)
63
    except socket.error, err:
64
      logger.Error("Error connecting to %s: %s" % (node, str(err)))
65
      self.failed = True
66

    
67
  def get_response(self):
68
    """Try to process the response from the node.
69

70
    """
71
    if self.failed:
72
      # we already failed in connect
73
      return False
74
    resp = self.http_conn.getresponse()
75
    if resp.status != 200:
76
      return False
77
    try:
78
      length = int(resp.getheader('Content-Length', '0'))
79
    except ValueError:
80
      return False
81
    if not length:
82
      logger.Error("Zero-length reply from %s" % self.node)
83
      return False
84
    payload = resp.read(length)
85
    unload = simplejson.loads(payload)
86
    return unload
87

    
88

    
89
class Client:
90
  """RPC Client class.
91

92
  This class, given a (remote) method name, a list of parameters and a
93
  list of nodes, will contact (in parallel) all nodes, and return a
94
  dict of results (key: node name, value: result).
95

96
  One current bug is that generic failure is still signalled by
97
  'False' result, which is not good. This overloading of values can
98
  cause bugs.
99

100
  """
101
  result_set = False
102
  result = False
103
  allresult = []
104

    
105
  def __init__(self, procedure, args):
106
    ss = ssconf.SimpleStore()
107
    self.port = ss.GetNodeDaemonPort()
108
    self.nodepw = ss.GetNodeDaemonPassword()
109
    self.nc = {}
110
    self.results = {}
111
    self.procedure = procedure
112
    self.args = args
113
    self.body = simplejson.dumps(args)
114

    
115
  #--- generic connector -------------
116

    
117
  def connect_list(self, node_list):
118
    """Add a list of nodes to the target nodes.
119

120
    """
121
    for node in node_list:
122
      self.connect(node)
123

    
124
  def connect(self, connect_node):
125
    """Add a node to the target list.
126

127
    """
128
    self.nc[connect_node] = nc = NodeController(self, connect_node)
129

    
130
  def getresult(self):
131
    """Return the results of the call.
132

133
    """
134
    return self.results
135

    
136
  def run(self):
137
    """Wrapper over reactor.run().
138

139
    This function simply calls reactor.run() if we have any requests
140
    queued, otherwise it does nothing.
141

142
    """
143
    for node, nc in self.nc.items():
144
      self.results[node] = nc.get_response()
145

    
146

    
147
def call_volume_list(node_list, vg_name):
148
  """Gets the logical volumes present in a given volume group.
149

150
  This is a multi-node call.
151

152
  """
153
  c = Client("volume_list", [vg_name])
154
  c.connect_list(node_list)
155
  c.run()
156
  return c.getresult()
157

    
158

    
159
def call_vg_list(node_list):
160
  """Gets the volume group list.
161

162
  This is a multi-node call.
163

164
  """
165
  c = Client("vg_list", [])
166
  c.connect_list(node_list)
167
  c.run()
168
  return c.getresult()
169

    
170

    
171
def call_bridges_exist(node, bridges_list):
172
  """Checks if a node has all the bridges given.
173

174
  This method checks if all bridges given in the bridges_list are
175
  present on the remote node, so that an instance that uses interfaces
176
  on those bridges can be started.
177

178
  This is a single-node call.
179

180
  """
181
  c = Client("bridges_exist", [bridges_list])
182
  c.connect(node)
183
  c.run()
184
  return c.getresult().get(node, False)
185

    
186

    
187
def call_instance_start(node, instance, extra_args):
188
  """Starts an instance.
189

190
  This is a single-node call.
191

192
  """
193
  c = Client("instance_start", [instance.ToDict(), extra_args])
194
  c.connect(node)
195
  c.run()
196
  return c.getresult().get(node, False)
197

    
198

    
199
def call_instance_shutdown(node, instance):
200
  """Stops an instance.
201

202
  This is a single-node call.
203

204
  """
205
  c = Client("instance_shutdown", [instance.ToDict()])
206
  c.connect(node)
207
  c.run()
208
  return c.getresult().get(node, False)
209

    
210

    
211
def call_instance_migrate(node, instance, target, live):
212
  """Migrate an instance.
213

214
  This is a single-node call.
215

216
  """
217
  c = Client("instance_migrate", [instance.name, target, live])
218
  c.connect(node)
219
  c.run()
220
  return c.getresult().get(node, False)
221

    
222

    
223
def call_instance_reboot(node, instance, reboot_type, extra_args):
224
  """Reboots an instance.
225

226
  This is a single-node call.
227

228
  """
229
  c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
230
  c.connect(node)
231
  c.run()
232
  return c.getresult().get(node, False)
233

    
234

    
235
def call_instance_os_add(node, inst, osdev, swapdev):
236
  """Installs an OS on the given instance.
237

238
  This is a single-node call.
239

240
  """
241
  params = [inst.ToDict(), osdev, swapdev]
242
  c = Client("instance_os_add", params)
243
  c.connect(node)
244
  c.run()
245
  return c.getresult().get(node, False)
246

    
247

    
248
def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
249
  """Run the OS rename script for an instance.
250

251
  This is a single-node call.
252

253
  """
254
  params = [inst.ToDict(), old_name, osdev, swapdev]
255
  c = Client("instance_run_rename", params)
256
  c.connect(node)
257
  c.run()
258
  return c.getresult().get(node, False)
259

    
260

    
261
def call_instance_info(node, instance):
262
  """Returns information about a single instance.
263

264
  This is a single-node call.
265

266
  """
267
  c = Client("instance_info", [instance])
268
  c.connect(node)
269
  c.run()
270
  return c.getresult().get(node, False)
271

    
272

    
273
def call_all_instances_info(node_list):
274
  """Returns information about all instances on a given node.
275

276
  This is a single-node call.
277

278
  """
279
  c = Client("all_instances_info", [])
280
  c.connect_list(node_list)
281
  c.run()
282
  return c.getresult()
283

    
284

    
285
def call_instance_list(node_list):
286
  """Returns the list of running instances on a given node.
287

288
  This is a single-node call.
289

290
  """
291
  c = Client("instance_list", [])
292
  c.connect_list(node_list)
293
  c.run()
294
  return c.getresult()
295

    
296

    
297
def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
298
  """Do a TcpPing on the remote node
299

300
  This is a single-node call.
301
  """
302
  c = Client("node_tcp_ping", [source, target, port, timeout,
303
                               live_port_needed])
304
  c.connect(node)
305
  c.run()
306
  return c.getresult().get(node, False)
307

    
308

    
309
def call_node_info(node_list, vg_name):
310
  """Return node information.
311

312
  This will return memory information and volume group size and free
313
  space.
314

315
  This is a multi-node call.
316

317
  """
318
  c = Client("node_info", [vg_name])
319
  c.connect_list(node_list)
320
  c.run()
321
  retux = c.getresult()
322

    
323
  for node_name in retux:
324
    ret = retux.get(node_name, False)
325
    if type(ret) != dict:
326
      logger.Error("could not connect to node %s" % (node_name))
327
      ret = {}
328

    
329
    utils.CheckDict(ret,
330
                    { 'memory_total' : '-',
331
                      'memory_dom0' : '-',
332
                      'memory_free' : '-',
333
                      'vg_size' : 'node_unreachable',
334
                      'vg_free' : '-' },
335
                    "call_node_info",
336
                    )
337
  return retux
338

    
339

    
340
def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
341
  """Add a node to the cluster.
342

343
  This is a single-node call.
344

345
  """
346
  params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
347
  c = Client("node_add", params)
348
  c.connect(node)
349
  c.run()
350
  return c.getresult().get(node, False)
351

    
352

    
353
def call_node_verify(node_list, checkdict):
354
  """Request verification of given parameters.
355

356
  This is a multi-node call.
357

358
  """
359
  c = Client("node_verify", [checkdict])
360
  c.connect_list(node_list)
361
  c.run()
362
  return c.getresult()
363

    
364

    
365
def call_node_start_master(node):
366
  """Tells a node to activate itself as a master.
367

368
  This is a single-node call.
369

370
  """
371
  c = Client("node_start_master", [])
372
  c.connect(node)
373
  c.run()
374
  return c.getresult().get(node, False)
375

    
376

    
377
def call_node_stop_master(node):
378
  """Tells a node to demote itself from master status.
379

380
  This is a single-node call.
381

382
  """
383
  c = Client("node_stop_master", [])
384
  c.connect(node)
385
  c.run()
386
  return c.getresult().get(node, False)
387

    
388

    
389
def call_version(node_list):
390
  """Query node version.
391

392
  This is a multi-node call.
393

394
  """
395
  c = Client("version", [])
396
  c.connect_list(node_list)
397
  c.run()
398
  return c.getresult()
399

    
400

    
401
def call_blockdev_create(node, bdev, size, owner, on_primary, info):
402
  """Request creation of a given block device.
403

404
  This is a single-node call.
405

406
  """
407
  params = [bdev.ToDict(), size, owner, on_primary, info]
408
  c = Client("blockdev_create", params)
409
  c.connect(node)
410
  c.run()
411
  return c.getresult().get(node, False)
412

    
413

    
414
def call_blockdev_remove(node, bdev):
415
  """Request removal of a given block device.
416

417
  This is a single-node call.
418

419
  """
420
  c = Client("blockdev_remove", [bdev.ToDict()])
421
  c.connect(node)
422
  c.run()
423
  return c.getresult().get(node, False)
424

    
425

    
426
def call_blockdev_rename(node, devlist):
427
  """Request rename of the given block devices.
428

429
  This is a single-node call.
430

431
  """
432
  params = [(d.ToDict(), uid) for d, uid in devlist]
433
  c = Client("blockdev_rename", params)
434
  c.connect(node)
435
  c.run()
436
  return c.getresult().get(node, False)
437

    
438

    
439
def call_blockdev_assemble(node, disk, owner, on_primary):
440
  """Request assembling of a given block device.
441

442
  This is a single-node call.
443

444
  """
445
  params = [disk.ToDict(), owner, on_primary]
446
  c = Client("blockdev_assemble", params)
447
  c.connect(node)
448
  c.run()
449
  return c.getresult().get(node, False)
450

    
451

    
452
def call_blockdev_shutdown(node, disk):
453
  """Request shutdown of a given block device.
454

455
  This is a single-node call.
456

457
  """
458
  c = Client("blockdev_shutdown", [disk.ToDict()])
459
  c.connect(node)
460
  c.run()
461
  return c.getresult().get(node, False)
462

    
463

    
464
def call_blockdev_addchildren(node, bdev, ndevs):
465
  """Request adding a list of children to a (mirroring) device.
466

467
  This is a single-node call.
468

469
  """
470
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
471
  c = Client("blockdev_addchildren", params)
472
  c.connect(node)
473
  c.run()
474
  return c.getresult().get(node, False)
475

    
476

    
477
def call_blockdev_removechildren(node, bdev, ndevs):
478
  """Request removing a list of children from a (mirroring) device.
479

480
  This is a single-node call.
481

482
  """
483
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
484
  c = Client("blockdev_removechildren", params)
485
  c.connect(node)
486
  c.run()
487
  return c.getresult().get(node, False)
488

    
489

    
490
def call_blockdev_getmirrorstatus(node, disks):
491
  """Request status of a (mirroring) device.
492

493
  This is a single-node call.
494

495
  """
496
  params = [dsk.ToDict() for dsk in disks]
497
  c = Client("blockdev_getmirrorstatus", params)
498
  c.connect(node)
499
  c.run()
500
  return c.getresult().get(node, False)
501

    
502

    
503
def call_blockdev_find(node, disk):
504
  """Request identification of a given block device.
505

506
  This is a single-node call.
507

508
  """
509
  c = Client("blockdev_find", [disk.ToDict()])
510
  c.connect(node)
511
  c.run()
512
  return c.getresult().get(node, False)
513

    
514

    
515
def call_upload_file(node_list, file_name):
516
  """Upload a file.
517

518
  The node will refuse the operation in case the file is not on the
519
  approved file list.
520

521
  This is a multi-node call.
522

523
  """
524
  fh = file(file_name)
525
  try:
526
    data = fh.read()
527
  finally:
528
    fh.close()
529
  st = os.stat(file_name)
530
  params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
531
            st.st_atime, st.st_mtime]
532
  c = Client("upload_file", params)
533
  c.connect_list(node_list)
534
  c.run()
535
  return c.getresult()
536

    
537

    
538
def call_os_diagnose(node_list):
539
  """Request a diagnose of OS definitions.
540

541
  This is a multi-node call.
542

543
  """
544
  c = Client("os_diagnose", [])
545
  c.connect_list(node_list)
546
  c.run()
547
  result = c.getresult()
548
  new_result = {}
549
  for node_name in result:
550
    if result[node_name]:
551
      nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
552
    else:
553
      nr = []
554
    new_result[node_name] = nr
555
  return new_result
556

    
557

    
558
def call_os_get(node, name):
559
  """Returns an OS definition.
560

561
  This is a single-node call.
562

563
  """
564
  c = Client("os_get", [name])
565
  c.connect(node)
566
  c.run()
567
  result = c.getresult().get(node, False)
568
  if isinstance(result, dict):
569
    return objects.OS.FromDict(result)
570
  else:
571
    return result
572

    
573

    
574
def call_hooks_runner(node_list, hpath, phase, env):
575
  """Call the hooks runner.
576

577
  Args:
578
    - op: the OpCode instance
579
    - env: a dictionary with the environment
580

581
  This is a multi-node call.
582

583
  """
584
  params = [hpath, phase, env]
585
  c = Client("hooks_runner", params)
586
  c.connect_list(node_list)
587
  c.run()
588
  result = c.getresult()
589
  return result
590

    
591

    
592
def call_iallocator_runner(node, name, idata):
593
  """Call an iallocator on a remote node
594

595
  Args:
596
    - name: the iallocator name
597
    - input: the json-encoded input string
598

599
  This is a single-node call.
600

601
  """
602
  params = [name, idata]
603
  c = Client("iallocator_runner", params)
604
  c.connect(node)
605
  c.run()
606
  result = c.getresult().get(node, False)
607
  return result
608

    
609

    
610
def call_blockdev_grow(node, cf_bdev, amount):
611
  """Request a snapshot of the given block device.
612

613
  This is a single-node call.
614

615
  """
616
  c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
617
  c.connect(node)
618
  c.run()
619
  return c.getresult().get(node, False)
620

    
621

    
622
def call_blockdev_snapshot(node, cf_bdev):
623
  """Request a snapshot of the given block device.
624

625
  This is a single-node call.
626

627
  """
628
  c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
629
  c.connect(node)
630
  c.run()
631
  return c.getresult().get(node, False)
632

    
633

    
634
def call_snapshot_export(node, snap_bdev, dest_node, instance):
635
  """Request the export of a given snapshot.
636

637
  This is a single-node call.
638

639
  """
640
  params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
641
  c = Client("snapshot_export", params)
642
  c.connect(node)
643
  c.run()
644
  return c.getresult().get(node, False)
645

    
646

    
647
def call_finalize_export(node, instance, snap_disks):
648
  """Request the completion of an export operation.
649

650
  This writes the export config file, etc.
651

652
  This is a single-node call.
653

654
  """
655
  flat_disks = []
656
  for disk in snap_disks:
657
    flat_disks.append(disk.ToDict())
658
  params = [instance.ToDict(), flat_disks]
659
  c = Client("finalize_export", params)
660
  c.connect(node)
661
  c.run()
662
  return c.getresult().get(node, False)
663

    
664

    
665
def call_export_info(node, path):
666
  """Queries the export information in a given path.
667

668
  This is a single-node call.
669

670
  """
671
  c = Client("export_info", [path])
672
  c.connect(node)
673
  c.run()
674
  result = c.getresult().get(node, False)
675
  if not result:
676
    return result
677
  return objects.SerializableConfigParser.Loads(str(result))
678

    
679

    
680
def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
681
  """Request the import of a backup into an instance.
682

683
  This is a single-node call.
684

685
  """
686
  params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
687
  c = Client("instance_os_import", params)
688
  c.connect(node)
689
  c.run()
690
  return c.getresult().get(node, False)
691

    
692

    
693
def call_export_list(node_list):
694
  """Gets the stored exports list.
695

696
  This is a multi-node call.
697

698
  """
699
  c = Client("export_list", [])
700
  c.connect_list(node_list)
701
  c.run()
702
  result = c.getresult()
703
  return result
704

    
705

    
706
def call_export_remove(node, export):
707
  """Requests removal of a given export.
708

709
  This is a single-node call.
710

711
  """
712
  c = Client("export_remove", [export])
713
  c.connect(node)
714
  c.run()
715
  return c.getresult().get(node, False)
716

    
717

    
718
def call_node_leave_cluster(node):
719
  """Requests a node to clean the cluster information it has.
720

721
  This will remove the configuration information from the ganeti data
722
  dir.
723

724
  This is a single-node call.
725

726
  """
727
  c = Client("node_leave_cluster", [])
728
  c.connect(node)
729
  c.run()
730
  return c.getresult().get(node, False)
731

    
732

    
733
def call_node_volumes(node_list):
734
  """Gets all volumes on node(s).
735

736
  This is a multi-node call.
737

738
  """
739
  c = Client("node_volumes", [])
740
  c.connect_list(node_list)
741
  c.run()
742
  return c.getresult()
743

    
744

    
745
def call_test_delay(node_list, duration):
746
  """Sleep for a fixed time on given node(s).
747

748
  This is a multi-node call.
749

750
  """
751
  c = Client("test_delay", [duration])
752
  c.connect_list(node_list)
753
  c.run()
754
  return c.getresult()
755

    
756

    
757
def call_file_storage_dir_create(node, file_storage_dir):
758
  """Create the given file storage directory.
759

760
  This is a single-node call.
761

762
  """
763
  c = Client("file_storage_dir_create", [file_storage_dir])
764
  c.connect(node)
765
  c.run()
766
  return c.getresult().get(node, False)
767

    
768

    
769
def call_file_storage_dir_remove(node, file_storage_dir):
770
  """Remove the given file storage directory.
771

772
  This is a single-node call.
773

774
  """
775
  c = Client("file_storage_dir_remove", [file_storage_dir])
776
  c.connect(node)
777
  c.run()
778
  return c.getresult().get(node, False)
779

    
780

    
781
def call_file_storage_dir_rename(node, old_file_storage_dir,
782
                                 new_file_storage_dir):
783
  """Rename file storage directory.
784

785
  This is a single-node call.
786

787
  """
788
  c = Client("file_storage_dir_rename",
789
             [old_file_storage_dir, new_file_storage_dir])
790
  c.connect(node)
791
  c.run()
792
  return c.getresult().get(node, False)
793