Statistics
| Branch: | Revision:

root / migration-rdma.c @ f53ec699

History | View | Annotate | Download (98.2 kB)

1
/*
2
 * RDMA protocol and interfaces
3
 *
4
 * Copyright IBM, Corp. 2010-2013
5
 *
6
 * Authors:
7
 *  Michael R. Hines <mrhines@us.ibm.com>
8
 *  Jiuxing Liu <jl@us.ibm.com>
9
 *
10
 * This work is licensed under the terms of the GNU GPL, version 2 or
11
 * later.  See the COPYING file in the top-level directory.
12
 *
13
 */
14
#include "qemu-common.h"
15
#include "migration/migration.h"
16
#include "migration/qemu-file.h"
17
#include "exec/cpu-common.h"
18
#include "qemu/main-loop.h"
19
#include "qemu/sockets.h"
20
#include "qemu/bitmap.h"
21
#include "block/coroutine.h"
22
#include <stdio.h>
23
#include <sys/types.h>
24
#include <sys/socket.h>
25
#include <netdb.h>
26
#include <arpa/inet.h>
27
#include <string.h>
28
#include <rdma/rdma_cma.h>
29

    
30
#define DEBUG_RDMA
31
//#define DEBUG_RDMA_VERBOSE
32
//#define DEBUG_RDMA_REALLY_VERBOSE
33

    
34
#ifdef DEBUG_RDMA
35
#define DPRINTF(fmt, ...) \
36
    do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
37
#else
38
#define DPRINTF(fmt, ...) \
39
    do { } while (0)
40
#endif
41

    
42
#ifdef DEBUG_RDMA_VERBOSE
43
#define DDPRINTF(fmt, ...) \
44
    do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
45
#else
46
#define DDPRINTF(fmt, ...) \
47
    do { } while (0)
48
#endif
49

    
50
#ifdef DEBUG_RDMA_REALLY_VERBOSE
51
#define DDDPRINTF(fmt, ...) \
52
    do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
53
#else
54
#define DDDPRINTF(fmt, ...) \
55
    do { } while (0)
56
#endif
57

    
58
/*
59
 * Print and error on both the Monitor and the Log file.
60
 */
61
#define ERROR(errp, fmt, ...) \
62
    do { \
63
        fprintf(stderr, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
64
        if (errp && (*(errp) == NULL)) { \
65
            error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
66
        } \
67
    } while (0)
68

    
69
#define RDMA_RESOLVE_TIMEOUT_MS 10000
70

    
71
/* Do not merge data if larger than this. */
72
#define RDMA_MERGE_MAX (2 * 1024 * 1024)
73
#define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
74

    
75
#define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
76

    
77
/*
78
 * This is only for non-live state being migrated.
79
 * Instead of RDMA_WRITE messages, we use RDMA_SEND
80
 * messages for that state, which requires a different
81
 * delivery design than main memory.
82
 */
83
#define RDMA_SEND_INCREMENT 32768
84

    
85
/*
86
 * Maximum size infiniband SEND message
87
 */
88
#define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
89
#define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
90

    
91
#define RDMA_CONTROL_VERSION_CURRENT 1
92
/*
93
 * Capabilities for negotiation.
94
 */
95
#define RDMA_CAPABILITY_PIN_ALL 0x01
96

    
97
/*
98
 * Add the other flags above to this list of known capabilities
99
 * as they are introduced.
100
 */
101
static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
102

    
103
#define CHECK_ERROR_STATE() \
104
    do { \
105
        if (rdma->error_state) { \
106
            if (!rdma->error_reported) { \
107
                fprintf(stderr, "RDMA is in an error state waiting migration" \
108
                                " to abort!\n"); \
109
                rdma->error_reported = 1; \
110
            } \
111
            return rdma->error_state; \
112
        } \
113
    } while (0);
114

    
115
/*
116
 * A work request ID is 64-bits and we split up these bits
117
 * into 3 parts:
118
 *
119
 * bits 0-15 : type of control message, 2^16
120
 * bits 16-29: ram block index, 2^14
121
 * bits 30-63: ram block chunk number, 2^34
122
 *
123
 * The last two bit ranges are only used for RDMA writes,
124
 * in order to track their completion and potentially
125
 * also track unregistration status of the message.
126
 */
127
#define RDMA_WRID_TYPE_SHIFT  0UL
128
#define RDMA_WRID_BLOCK_SHIFT 16UL
129
#define RDMA_WRID_CHUNK_SHIFT 30UL
130

    
131
#define RDMA_WRID_TYPE_MASK \
132
    ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
133

    
134
#define RDMA_WRID_BLOCK_MASK \
135
    (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
136

    
137
#define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
138

    
139
/*
140
 * RDMA migration protocol:
141
 * 1. RDMA Writes (data messages, i.e. RAM)
142
 * 2. IB Send/Recv (control channel messages)
143
 */
144
enum {
145
    RDMA_WRID_NONE = 0,
146
    RDMA_WRID_RDMA_WRITE = 1,
147
    RDMA_WRID_SEND_CONTROL = 2000,
148
    RDMA_WRID_RECV_CONTROL = 4000,
149
};
150

    
151
const char *wrid_desc[] = {
152
    [RDMA_WRID_NONE] = "NONE",
153
    [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
154
    [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
155
    [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
156
};
157

    
158
/*
159
 * Work request IDs for IB SEND messages only (not RDMA writes).
160
 * This is used by the migration protocol to transmit
161
 * control messages (such as device state and registration commands)
162
 *
163
 * We could use more WRs, but we have enough for now.
164
 */
165
enum {
166
    RDMA_WRID_READY = 0,
167
    RDMA_WRID_DATA,
168
    RDMA_WRID_CONTROL,
169
    RDMA_WRID_MAX,
170
};
171

    
172
/*
173
 * SEND/RECV IB Control Messages.
174
 */
175
enum {
176
    RDMA_CONTROL_NONE = 0,
177
    RDMA_CONTROL_ERROR,
178
    RDMA_CONTROL_READY,               /* ready to receive */
179
    RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
180
    RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
181
    RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
182
    RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
183
    RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
184
    RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
185
    RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
186
    RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
187
    RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
188
};
189

    
190
const char *control_desc[] = {
191
    [RDMA_CONTROL_NONE] = "NONE",
192
    [RDMA_CONTROL_ERROR] = "ERROR",
193
    [RDMA_CONTROL_READY] = "READY",
194
    [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
195
    [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
196
    [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
197
    [RDMA_CONTROL_COMPRESS] = "COMPRESS",
198
    [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
199
    [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
200
    [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
201
    [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
202
    [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
203
};
204

    
205
/*
206
 * Memory and MR structures used to represent an IB Send/Recv work request.
207
 * This is *not* used for RDMA writes, only IB Send/Recv.
208
 */
209
typedef struct {
210
    uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
211
    struct   ibv_mr *control_mr;               /* registration metadata */
212
    size_t   control_len;                      /* length of the message */
213
    uint8_t *control_curr;                     /* start of unconsumed bytes */
214
} RDMAWorkRequestData;
215

    
216
/*
217
 * Negotiate RDMA capabilities during connection-setup time.
218
 */
219
typedef struct {
220
    uint32_t version;
221
    uint32_t flags;
222
} RDMACapabilities;
223

    
224
static void caps_to_network(RDMACapabilities *cap)
225
{
226
    cap->version = htonl(cap->version);
227
    cap->flags = htonl(cap->flags);
228
}
229

    
230
static void network_to_caps(RDMACapabilities *cap)
231
{
232
    cap->version = ntohl(cap->version);
233
    cap->flags = ntohl(cap->flags);
234
}
235

    
236
/*
237
 * Representation of a RAMBlock from an RDMA perspective.
238
 * This is not transmitted, only local.
239
 * This and subsequent structures cannot be linked lists
240
 * because we're using a single IB message to transmit
241
 * the information. It's small anyway, so a list is overkill.
242
 */
243
typedef struct RDMALocalBlock {
244
    uint8_t  *local_host_addr; /* local virtual address */
245
    uint64_t remote_host_addr; /* remote virtual address */
246
    uint64_t offset;
247
    uint64_t length;
248
    struct   ibv_mr **pmr;     /* MRs for chunk-level registration */
249
    struct   ibv_mr *mr;       /* MR for non-chunk-level registration */
250
    uint32_t *remote_keys;     /* rkeys for chunk-level registration */
251
    uint32_t remote_rkey;      /* rkeys for non-chunk-level registration */
252
    int      index;            /* which block are we */
253
    bool     is_ram_block;
254
    int      nb_chunks;
255
    unsigned long *transit_bitmap;
256
    unsigned long *unregister_bitmap;
257
} RDMALocalBlock;
258

    
259
/*
260
 * Also represents a RAMblock, but only on the dest.
261
 * This gets transmitted by the dest during connection-time
262
 * to the source VM and then is used to populate the
263
 * corresponding RDMALocalBlock with
264
 * the information needed to perform the actual RDMA.
265
 */
266
typedef struct QEMU_PACKED RDMARemoteBlock {
267
    uint64_t remote_host_addr;
268
    uint64_t offset;
269
    uint64_t length;
270
    uint32_t remote_rkey;
271
    uint32_t padding;
272
} RDMARemoteBlock;
273

    
274
static uint64_t htonll(uint64_t v)
275
{
276
    union { uint32_t lv[2]; uint64_t llv; } u;
277
    u.lv[0] = htonl(v >> 32);
278
    u.lv[1] = htonl(v & 0xFFFFFFFFULL);
279
    return u.llv;
280
}
281

    
282
static uint64_t ntohll(uint64_t v) {
283
    union { uint32_t lv[2]; uint64_t llv; } u;
284
    u.llv = v;
285
    return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
286
}
287

    
288
static void remote_block_to_network(RDMARemoteBlock *rb)
289
{
290
    rb->remote_host_addr = htonll(rb->remote_host_addr);
291
    rb->offset = htonll(rb->offset);
292
    rb->length = htonll(rb->length);
293
    rb->remote_rkey = htonl(rb->remote_rkey);
294
}
295

    
296
static void network_to_remote_block(RDMARemoteBlock *rb)
297
{
298
    rb->remote_host_addr = ntohll(rb->remote_host_addr);
299
    rb->offset = ntohll(rb->offset);
300
    rb->length = ntohll(rb->length);
301
    rb->remote_rkey = ntohl(rb->remote_rkey);
302
}
303

    
304
/*
305
 * Virtual address of the above structures used for transmitting
306
 * the RAMBlock descriptions at connection-time.
307
 * This structure is *not* transmitted.
308
 */
309
typedef struct RDMALocalBlocks {
310
    int nb_blocks;
311
    bool     init;             /* main memory init complete */
312
    RDMALocalBlock *block;
313
} RDMALocalBlocks;
314

    
315
/*
316
 * Main data structure for RDMA state.
317
 * While there is only one copy of this structure being allocated right now,
318
 * this is the place where one would start if you wanted to consider
319
 * having more than one RDMA connection open at the same time.
320
 */
321
typedef struct RDMAContext {
322
    char *host;
323
    int port;
324

    
325
    RDMAWorkRequestData wr_data[RDMA_WRID_MAX + 1];
326

    
327
    /*
328
     * This is used by *_exchange_send() to figure out whether or not
329
     * the initial "READY" message has already been received or not.
330
     * This is because other functions may potentially poll() and detect
331
     * the READY message before send() does, in which case we need to
332
     * know if it completed.
333
     */
334
    int control_ready_expected;
335

    
336
    /* number of outstanding writes */
337
    int nb_sent;
338

    
339
    /* store info about current buffer so that we can
340
       merge it with future sends */
341
    uint64_t current_addr;
342
    uint64_t current_length;
343
    /* index of ram block the current buffer belongs to */
344
    int current_index;
345
    /* index of the chunk in the current ram block */
346
    int current_chunk;
347

    
348
    bool pin_all;
349

    
350
    /*
351
     * infiniband-specific variables for opening the device
352
     * and maintaining connection state and so forth.
353
     *
354
     * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
355
     * cm_id->verbs, cm_id->channel, and cm_id->qp.
356
     */
357
    struct rdma_cm_id *cm_id;               /* connection manager ID */
358
    struct rdma_cm_id *listen_id;
359

    
360
    struct ibv_context          *verbs;
361
    struct rdma_event_channel   *channel;
362
    struct ibv_qp *qp;                      /* queue pair */
363
    struct ibv_comp_channel *comp_channel;  /* completion channel */
364
    struct ibv_pd *pd;                      /* protection domain */
365
    struct ibv_cq *cq;                      /* completion queue */
366

    
367
    /*
368
     * If a previous write failed (perhaps because of a failed
369
     * memory registration, then do not attempt any future work
370
     * and remember the error state.
371
     */
372
    int error_state;
373
    int error_reported;
374

    
375
    /*
376
     * Description of ram blocks used throughout the code.
377
     */
378
    RDMALocalBlocks local_ram_blocks;
379
    RDMARemoteBlock *block;
380

    
381
    /*
382
     * Migration on *destination* started.
383
     * Then use coroutine yield function.
384
     * Source runs in a thread, so we don't care.
385
     */
386
    int migration_started_on_destination;
387

    
388
    int total_registrations;
389
    int total_writes;
390

    
391
    int unregister_current, unregister_next;
392
    uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
393

    
394
    GHashTable *blockmap;
395
} RDMAContext;
396

    
397
/*
398
 * Interface to the rest of the migration call stack.
399
 */
400
typedef struct QEMUFileRDMA {
401
    RDMAContext *rdma;
402
    size_t len;
403
    void *file;
404
} QEMUFileRDMA;
405

    
406
/*
407
 * Main structure for IB Send/Recv control messages.
408
 * This gets prepended at the beginning of every Send/Recv.
409
 */
410
typedef struct QEMU_PACKED {
411
    uint32_t len;     /* Total length of data portion */
412
    uint32_t type;    /* which control command to perform */
413
    uint32_t repeat;  /* number of commands in data portion of same type */
414
    uint32_t padding;
415
} RDMAControlHeader;
416

    
417
static void control_to_network(RDMAControlHeader *control)
418
{
419
    control->type = htonl(control->type);
420
    control->len = htonl(control->len);
421
    control->repeat = htonl(control->repeat);
422
}
423

    
424
static void network_to_control(RDMAControlHeader *control)
425
{
426
    control->type = ntohl(control->type);
427
    control->len = ntohl(control->len);
428
    control->repeat = ntohl(control->repeat);
429
}
430

    
431
/*
432
 * Register a single Chunk.
433
 * Information sent by the source VM to inform the dest
434
 * to register an single chunk of memory before we can perform
435
 * the actual RDMA operation.
436
 */
437
typedef struct QEMU_PACKED {
438
    union QEMU_PACKED {
439
        uint64_t current_addr;  /* offset into the ramblock of the chunk */
440
        uint64_t chunk;         /* chunk to lookup if unregistering */
441
    } key;
442
    uint32_t current_index; /* which ramblock the chunk belongs to */
443
    uint32_t padding;
444
    uint64_t chunks;            /* how many sequential chunks to register */
445
} RDMARegister;
446

    
447
static void register_to_network(RDMARegister *reg)
448
{
449
    reg->key.current_addr = htonll(reg->key.current_addr);
450
    reg->current_index = htonl(reg->current_index);
451
    reg->chunks = htonll(reg->chunks);
452
}
453

    
454
static void network_to_register(RDMARegister *reg)
455
{
456
    reg->key.current_addr = ntohll(reg->key.current_addr);
457
    reg->current_index = ntohl(reg->current_index);
458
    reg->chunks = ntohll(reg->chunks);
459
}
460

    
461
typedef struct QEMU_PACKED {
462
    uint32_t value;     /* if zero, we will madvise() */
463
    uint32_t block_idx; /* which ram block index */
464
    uint64_t offset;    /* where in the remote ramblock this chunk */
465
    uint64_t length;    /* length of the chunk */
466
} RDMACompress;
467

    
468
static void compress_to_network(RDMACompress *comp)
469
{
470
    comp->value = htonl(comp->value);
471
    comp->block_idx = htonl(comp->block_idx);
472
    comp->offset = htonll(comp->offset);
473
    comp->length = htonll(comp->length);
474
}
475

    
476
static void network_to_compress(RDMACompress *comp)
477
{
478
    comp->value = ntohl(comp->value);
479
    comp->block_idx = ntohl(comp->block_idx);
480
    comp->offset = ntohll(comp->offset);
481
    comp->length = ntohll(comp->length);
482
}
483

    
484
/*
485
 * The result of the dest's memory registration produces an "rkey"
486
 * which the source VM must reference in order to perform
487
 * the RDMA operation.
488
 */
489
typedef struct QEMU_PACKED {
490
    uint32_t rkey;
491
    uint32_t padding;
492
    uint64_t host_addr;
493
} RDMARegisterResult;
494

    
495
static void result_to_network(RDMARegisterResult *result)
496
{
497
    result->rkey = htonl(result->rkey);
498
    result->host_addr = htonll(result->host_addr);
499
};
500

    
501
static void network_to_result(RDMARegisterResult *result)
502
{
503
    result->rkey = ntohl(result->rkey);
504
    result->host_addr = ntohll(result->host_addr);
505
};
506

    
507
const char *print_wrid(int wrid);
508
static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
509
                                   uint8_t *data, RDMAControlHeader *resp,
510
                                   int *resp_idx,
511
                                   int (*callback)(RDMAContext *rdma));
512

    
513
static inline uint64_t ram_chunk_index(uint8_t *start, uint8_t *host)
514
{
515
    return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
516
}
517

    
518
static inline uint8_t *ram_chunk_start(RDMALocalBlock *rdma_ram_block,
519
                                       uint64_t i)
520
{
521
    return (uint8_t *) (((uintptr_t) rdma_ram_block->local_host_addr)
522
                                    + (i << RDMA_REG_CHUNK_SHIFT));
523
}
524

    
525
static inline uint8_t *ram_chunk_end(RDMALocalBlock *rdma_ram_block, uint64_t i)
526
{
527
    uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
528
                                         (1UL << RDMA_REG_CHUNK_SHIFT);
529

    
530
    if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
531
        result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
532
    }
533

    
534
    return result;
535
}
536

    
537
static int __qemu_rdma_add_block(RDMAContext *rdma, void *host_addr,
538
                         ram_addr_t block_offset, uint64_t length)
539
{
540
    RDMALocalBlocks *local = &rdma->local_ram_blocks;
541
    RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
542
        (void *) block_offset);
543
    RDMALocalBlock *old = local->block;
544

    
545
    assert(block == NULL);
546

    
547
    local->block = g_malloc0(sizeof(RDMALocalBlock) * (local->nb_blocks + 1));
548

    
549
    if (local->nb_blocks) {
550
        int x;
551

    
552
        for (x = 0; x < local->nb_blocks; x++) {
553
            g_hash_table_remove(rdma->blockmap, (void *)old[x].offset);
554
            g_hash_table_insert(rdma->blockmap, (void *)old[x].offset,
555
                                                &local->block[x]);
556
        }
557
        memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
558
        g_free(old);
559
    }
560

    
561
    block = &local->block[local->nb_blocks];
562

    
563
    block->local_host_addr = host_addr;
564
    block->offset = block_offset;
565
    block->length = length;
566
    block->index = local->nb_blocks;
567
    block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
568
    block->transit_bitmap = bitmap_new(block->nb_chunks);
569
    bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
570
    block->unregister_bitmap = bitmap_new(block->nb_chunks);
571
    bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
572
    block->remote_keys = g_malloc0(block->nb_chunks * sizeof(uint32_t));
573

    
574
    block->is_ram_block = local->init ? false : true;
575

    
576
    g_hash_table_insert(rdma->blockmap, (void *) block_offset, block);
577

    
578
    DDPRINTF("Added Block: %d, addr: %" PRIu64 ", offset: %" PRIu64
579
           " length: %" PRIu64 " end: %" PRIu64 " bits %" PRIu64 " chunks %d\n",
580
            local->nb_blocks, (uint64_t) block->local_host_addr, block->offset,
581
            block->length, (uint64_t) (block->local_host_addr + block->length),
582
                BITS_TO_LONGS(block->nb_chunks) *
583
                    sizeof(unsigned long) * 8, block->nb_chunks);
584

    
585
    local->nb_blocks++;
586

    
587
    return 0;
588
}
589

    
590
/*
591
 * Memory regions need to be registered with the device and queue pairs setup
592
 * in advanced before the migration starts. This tells us where the RAM blocks
593
 * are so that we can register them individually.
594
 */
595
static void qemu_rdma_init_one_block(void *host_addr,
596
    ram_addr_t block_offset, ram_addr_t length, void *opaque)
597
{
598
    __qemu_rdma_add_block(opaque, host_addr, block_offset, length);
599
}
600

    
601
/*
602
 * Identify the RAMBlocks and their quantity. They will be references to
603
 * identify chunk boundaries inside each RAMBlock and also be referenced
604
 * during dynamic page registration.
605
 */
606
static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
607
{
608
    RDMALocalBlocks *local = &rdma->local_ram_blocks;
609

    
610
    assert(rdma->blockmap == NULL);
611
    rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
612
    memset(local, 0, sizeof *local);
613
    qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma);
614
    DPRINTF("Allocated %d local ram block structures\n", local->nb_blocks);
615
    rdma->block = (RDMARemoteBlock *) g_malloc0(sizeof(RDMARemoteBlock) *
616
                        rdma->local_ram_blocks.nb_blocks);
617
    local->init = true;
618
    return 0;
619
}
620

    
621
static int __qemu_rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
622
{
623
    RDMALocalBlocks *local = &rdma->local_ram_blocks;
624
    RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
625
        (void *) block_offset);
626
    RDMALocalBlock *old = local->block;
627
    int x;
628

    
629
    assert(block);
630

    
631
    if (block->pmr) {
632
        int j;
633

    
634
        for (j = 0; j < block->nb_chunks; j++) {
635
            if (!block->pmr[j]) {
636
                continue;
637
            }
638
            ibv_dereg_mr(block->pmr[j]);
639
            rdma->total_registrations--;
640
        }
641
        g_free(block->pmr);
642
        block->pmr = NULL;
643
    }
644

    
645
    if (block->mr) {
646
        ibv_dereg_mr(block->mr);
647
        rdma->total_registrations--;
648
        block->mr = NULL;
649
    }
650

    
651
    g_free(block->transit_bitmap);
652
    block->transit_bitmap = NULL;
653

    
654
    g_free(block->unregister_bitmap);
655
    block->unregister_bitmap = NULL;
656

    
657
    g_free(block->remote_keys);
658
    block->remote_keys = NULL;
659

    
660
    for (x = 0; x < local->nb_blocks; x++) {
661
        g_hash_table_remove(rdma->blockmap, (void *)old[x].offset);
662
    }
663

    
664
    if (local->nb_blocks > 1) {
665

    
666
        local->block = g_malloc0(sizeof(RDMALocalBlock) *
667
                                    (local->nb_blocks - 1));
668

    
669
        if (block->index) {
670
            memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
671
        }
672

    
673
        if (block->index < (local->nb_blocks - 1)) {
674
            memcpy(local->block + block->index, old + (block->index + 1),
675
                sizeof(RDMALocalBlock) *
676
                    (local->nb_blocks - (block->index + 1)));
677
        }
678
    } else {
679
        assert(block == local->block);
680
        local->block = NULL;
681
    }
682

    
683
    DDPRINTF("Deleted Block: %d, addr: %" PRIu64 ", offset: %" PRIu64
684
           " length: %" PRIu64 " end: %" PRIu64 " bits %" PRIu64 " chunks %d\n",
685
            local->nb_blocks, (uint64_t) block->local_host_addr, block->offset,
686
            block->length, (uint64_t) (block->local_host_addr + block->length),
687
                BITS_TO_LONGS(block->nb_chunks) *
688
                    sizeof(unsigned long) * 8, block->nb_chunks);
689

    
690
    g_free(old);
691

    
692
    local->nb_blocks--;
693

    
694
    if (local->nb_blocks) {
695
        for (x = 0; x < local->nb_blocks; x++) {
696
            g_hash_table_insert(rdma->blockmap, (void *)local->block[x].offset,
697
                                                &local->block[x]);
698
        }
699
    }
700

    
701
    return 0;
702
}
703

    
704
/*
705
 * Put in the log file which RDMA device was opened and the details
706
 * associated with that device.
707
 */
708
static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
709
{
710
    printf("%s RDMA Device opened: kernel name %s "
711
           "uverbs device name %s, "
712
           "infiniband_verbs class device path %s,"
713
           " infiniband class device path %s\n",
714
                who,
715
                verbs->device->name,
716
                verbs->device->dev_name,
717
                verbs->device->dev_path,
718
                verbs->device->ibdev_path);
719
}
720

    
721
/*
722
 * Put in the log file the RDMA gid addressing information,
723
 * useful for folks who have trouble understanding the
724
 * RDMA device hierarchy in the kernel.
725
 */
726
static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
727
{
728
    char sgid[33];
729
    char dgid[33];
730
    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
731
    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
732
    DPRINTF("%s Source GID: %s, Dest GID: %s\n", who, sgid, dgid);
733
}
734

    
735
/*
736
 * Figure out which RDMA device corresponds to the requested IP hostname
737
 * Also create the initial connection manager identifiers for opening
738
 * the connection.
739
 */
740
static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
741
{
742
    int ret;
743
    struct addrinfo *res;
744
    char port_str[16];
745
    struct rdma_cm_event *cm_event;
746
    char ip[40] = "unknown";
747

    
748
    if (rdma->host == NULL || !strcmp(rdma->host, "")) {
749
        ERROR(errp, "RDMA hostname has not been set\n");
750
        return -1;
751
    }
752

    
753
    /* create CM channel */
754
    rdma->channel = rdma_create_event_channel();
755
    if (!rdma->channel) {
756
        ERROR(errp, "could not create CM channel\n");
757
        return -1;
758
    }
759

    
760
    /* create CM id */
761
    ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
762
    if (ret) {
763
        ERROR(errp, "could not create channel id\n");
764
        goto err_resolve_create_id;
765
    }
766

    
767
    snprintf(port_str, 16, "%d", rdma->port);
768
    port_str[15] = '\0';
769

    
770
    ret = getaddrinfo(rdma->host, port_str, NULL, &res);
771
    if (ret < 0) {
772
        ERROR(errp, "could not getaddrinfo address %s\n", rdma->host);
773
        goto err_resolve_get_addr;
774
    }
775

    
776
    inet_ntop(AF_INET, &((struct sockaddr_in *) res->ai_addr)->sin_addr,
777
                                ip, sizeof ip);
778
    DPRINTF("%s => %s\n", rdma->host, ip);
779

    
780
    /* resolve the first address */
781
    ret = rdma_resolve_addr(rdma->cm_id, NULL, res->ai_addr,
782
            RDMA_RESOLVE_TIMEOUT_MS);
783
    if (ret) {
784
        ERROR(errp, "could not resolve address %s\n", rdma->host);
785
        goto err_resolve_get_addr;
786
    }
787

    
788
    qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
789

    
790
    ret = rdma_get_cm_event(rdma->channel, &cm_event);
791
    if (ret) {
792
        ERROR(errp, "could not perform event_addr_resolved\n");
793
        goto err_resolve_get_addr;
794
    }
795

    
796
    if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
797
        ERROR(errp, "result not equal to event_addr_resolved %s\n",
798
                rdma_event_str(cm_event->event));
799
        perror("rdma_resolve_addr");
800
        goto err_resolve_get_addr;
801
    }
802
    rdma_ack_cm_event(cm_event);
803

    
804
    /* resolve route */
805
    ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
806
    if (ret) {
807
        ERROR(errp, "could not resolve rdma route\n");
808
        goto err_resolve_get_addr;
809
    }
810

    
811
    ret = rdma_get_cm_event(rdma->channel, &cm_event);
812
    if (ret) {
813
        ERROR(errp, "could not perform event_route_resolved\n");
814
        goto err_resolve_get_addr;
815
    }
816
    if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
817
        ERROR(errp, "result not equal to event_route_resolved: %s\n",
818
                        rdma_event_str(cm_event->event));
819
        rdma_ack_cm_event(cm_event);
820
        goto err_resolve_get_addr;
821
    }
822
    rdma_ack_cm_event(cm_event);
823
    rdma->verbs = rdma->cm_id->verbs;
824
    qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
825
    qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
826
    return 0;
827

    
828
err_resolve_get_addr:
829
    rdma_destroy_id(rdma->cm_id);
830
    rdma->cm_id = NULL;
831
err_resolve_create_id:
832
    rdma_destroy_event_channel(rdma->channel);
833
    rdma->channel = NULL;
834

    
835
    return -1;
836
}
837

    
838
/*
839
 * Create protection domain and completion queues
840
 */
841
static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
842
{
843
    /* allocate pd */
844
    rdma->pd = ibv_alloc_pd(rdma->verbs);
845
    if (!rdma->pd) {
846
        fprintf(stderr, "failed to allocate protection domain\n");
847
        return -1;
848
    }
849

    
850
    /* create completion channel */
851
    rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
852
    if (!rdma->comp_channel) {
853
        fprintf(stderr, "failed to allocate completion channel\n");
854
        goto err_alloc_pd_cq;
855
    }
856

    
857
    /*
858
     * Completion queue can be filled by both read and write work requests,
859
     * so must reflect the sum of both possible queue sizes.
860
     */
861
    rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
862
            NULL, rdma->comp_channel, 0);
863
    if (!rdma->cq) {
864
        fprintf(stderr, "failed to allocate completion queue\n");
865
        goto err_alloc_pd_cq;
866
    }
867

    
868
    return 0;
869

    
870
err_alloc_pd_cq:
871
    if (rdma->pd) {
872
        ibv_dealloc_pd(rdma->pd);
873
    }
874
    if (rdma->comp_channel) {
875
        ibv_destroy_comp_channel(rdma->comp_channel);
876
    }
877
    rdma->pd = NULL;
878
    rdma->comp_channel = NULL;
879
    return -1;
880

    
881
}
882

    
883
/*
884
 * Create queue pairs.
885
 */
886
static int qemu_rdma_alloc_qp(RDMAContext *rdma)
887
{
888
    struct ibv_qp_init_attr attr = { 0 };
889
    int ret;
890

    
891
    attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
892
    attr.cap.max_recv_wr = 3;
893
    attr.cap.max_send_sge = 1;
894
    attr.cap.max_recv_sge = 1;
895
    attr.send_cq = rdma->cq;
896
    attr.recv_cq = rdma->cq;
897
    attr.qp_type = IBV_QPT_RC;
898

    
899
    ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
900
    if (ret) {
901
        return -1;
902
    }
903

    
904
    rdma->qp = rdma->cm_id->qp;
905
    return 0;
906
}
907

    
908
static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
909
{
910
    int i;
911
    RDMALocalBlocks *local = &rdma->local_ram_blocks;
912

    
913
    for (i = 0; i < local->nb_blocks; i++) {
914
        local->block[i].mr =
915
            ibv_reg_mr(rdma->pd,
916
                    local->block[i].local_host_addr,
917
                    local->block[i].length,
918
                    IBV_ACCESS_LOCAL_WRITE |
919
                    IBV_ACCESS_REMOTE_WRITE
920
                    );
921
        if (!local->block[i].mr) {
922
            perror("Failed to register local dest ram block!\n");
923
            break;
924
        }
925
        rdma->total_registrations++;
926
    }
927

    
928
    if (i >= local->nb_blocks) {
929
        return 0;
930
    }
931

    
932
    for (i--; i >= 0; i--) {
933
        ibv_dereg_mr(local->block[i].mr);
934
        rdma->total_registrations--;
935
    }
936

    
937
    return -1;
938

    
939
}
940

    
941
/*
942
 * Find the ram block that corresponds to the page requested to be
943
 * transmitted by QEMU.
944
 *
945
 * Once the block is found, also identify which 'chunk' within that
946
 * block that the page belongs to.
947
 *
948
 * This search cannot fail or the migration will fail.
949
 */
950
static int qemu_rdma_search_ram_block(RDMAContext *rdma,
951
                                      uint64_t block_offset,
952
                                      uint64_t offset,
953
                                      uint64_t length,
954
                                      uint64_t *block_index,
955
                                      uint64_t *chunk_index)
956
{
957
    uint64_t current_addr = block_offset + offset;
958
    RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
959
                                                (void *) block_offset);
960
    assert(block);
961
    assert(current_addr >= block->offset);
962
    assert((current_addr + length) <= (block->offset + block->length));
963

    
964
    *block_index = block->index;
965
    *chunk_index = ram_chunk_index(block->local_host_addr,
966
                block->local_host_addr + (current_addr - block->offset));
967

    
968
    return 0;
969
}
970

    
971
/*
972
 * Register a chunk with IB. If the chunk was already registered
973
 * previously, then skip.
974
 *
975
 * Also return the keys associated with the registration needed
976
 * to perform the actual RDMA operation.
977
 */
978
static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
979
        RDMALocalBlock *block, uint8_t *host_addr,
980
        uint32_t *lkey, uint32_t *rkey, int chunk,
981
        uint8_t *chunk_start, uint8_t *chunk_end)
982
{
983
    if (block->mr) {
984
        if (lkey) {
985
            *lkey = block->mr->lkey;
986
        }
987
        if (rkey) {
988
            *rkey = block->mr->rkey;
989
        }
990
        return 0;
991
    }
992

    
993
    /* allocate memory to store chunk MRs */
994
    if (!block->pmr) {
995
        block->pmr = g_malloc0(block->nb_chunks * sizeof(struct ibv_mr *));
996
        if (!block->pmr) {
997
            return -1;
998
        }
999
    }
1000

    
1001
    /*
1002
     * If 'rkey', then we're the destination, so grant access to the source.
1003
     *
1004
     * If 'lkey', then we're the source VM, so grant access only to ourselves.
1005
     */
1006
    if (!block->pmr[chunk]) {
1007
        uint64_t len = chunk_end - chunk_start;
1008

    
1009
        DDPRINTF("Registering %" PRIu64 " bytes @ %p\n",
1010
                 len, chunk_start);
1011

    
1012
        block->pmr[chunk] = ibv_reg_mr(rdma->pd,
1013
                chunk_start, len,
1014
                (rkey ? (IBV_ACCESS_LOCAL_WRITE |
1015
                        IBV_ACCESS_REMOTE_WRITE) : 0));
1016

    
1017
        if (!block->pmr[chunk]) {
1018
            perror("Failed to register chunk!");
1019
            fprintf(stderr, "Chunk details: block: %d chunk index %d"
1020
                            " start %" PRIu64 " end %" PRIu64 " host %" PRIu64
1021
                            " local %" PRIu64 " registrations: %d\n",
1022
                            block->index, chunk, (uint64_t) chunk_start,
1023
                            (uint64_t) chunk_end, (uint64_t) host_addr,
1024
                            (uint64_t) block->local_host_addr,
1025
                            rdma->total_registrations);
1026
            return -1;
1027
        }
1028
        rdma->total_registrations++;
1029
    }
1030

    
1031
    if (lkey) {
1032
        *lkey = block->pmr[chunk]->lkey;
1033
    }
1034
    if (rkey) {
1035
        *rkey = block->pmr[chunk]->rkey;
1036
    }
1037
    return 0;
1038
}
1039

    
1040
/*
1041
 * Register (at connection time) the memory used for control
1042
 * channel messages.
1043
 */
1044
static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1045
{
1046
    rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1047
            rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1048
            IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1049
    if (rdma->wr_data[idx].control_mr) {
1050
        rdma->total_registrations++;
1051
        return 0;
1052
    }
1053
    fprintf(stderr, "qemu_rdma_reg_control failed!\n");
1054
    return -1;
1055
}
1056

    
1057
const char *print_wrid(int wrid)
1058
{
1059
    if (wrid >= RDMA_WRID_RECV_CONTROL) {
1060
        return wrid_desc[RDMA_WRID_RECV_CONTROL];
1061
    }
1062
    return wrid_desc[wrid];
1063
}
1064

    
1065
/*
1066
 * RDMA requires memory registration (mlock/pinning), but this is not good for
1067
 * overcommitment.
1068
 *
1069
 * In preparation for the future where LRU information or workload-specific
1070
 * writable writable working set memory access behavior is available to QEMU
1071
 * it would be nice to have in place the ability to UN-register/UN-pin
1072
 * particular memory regions from the RDMA hardware when it is determine that
1073
 * those regions of memory will likely not be accessed again in the near future.
1074
 *
1075
 * While we do not yet have such information right now, the following
1076
 * compile-time option allows us to perform a non-optimized version of this
1077
 * behavior.
1078
 *
1079
 * By uncommenting this option, you will cause *all* RDMA transfers to be
1080
 * unregistered immediately after the transfer completes on both sides of the
1081
 * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
1082
 *
1083
 * This will have a terrible impact on migration performance, so until future
1084
 * workload information or LRU information is available, do not attempt to use
1085
 * this feature except for basic testing.
1086
 */
1087
//#define RDMA_UNREGISTRATION_EXAMPLE
1088

    
1089
/*
1090
 * Perform a non-optimized memory unregistration after every transfer
1091
 * for demonsration purposes, only if pin-all is not requested.
1092
 *
1093
 * Potential optimizations:
1094
 * 1. Start a new thread to run this function continuously
1095
        - for bit clearing
1096
        - and for receipt of unregister messages
1097
 * 2. Use an LRU.
1098
 * 3. Use workload hints.
1099
 */
1100
static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1101
{
1102
    while (rdma->unregistrations[rdma->unregister_current]) {
1103
        int ret;
1104
        uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1105
        uint64_t chunk =
1106
            (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1107
        uint64_t index =
1108
            (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1109
        RDMALocalBlock *block =
1110
            &(rdma->local_ram_blocks.block[index]);
1111
        RDMARegister reg = { .current_index = index };
1112
        RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1113
                                 };
1114
        RDMAControlHeader head = { .len = sizeof(RDMARegister),
1115
                                   .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1116
                                   .repeat = 1,
1117
                                 };
1118

    
1119
        DDPRINTF("Processing unregister for chunk: %" PRIu64
1120
                 " at position %d\n", chunk, rdma->unregister_current);
1121

    
1122
        rdma->unregistrations[rdma->unregister_current] = 0;
1123
        rdma->unregister_current++;
1124

    
1125
        if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1126
            rdma->unregister_current = 0;
1127
        }
1128

    
1129

    
1130
        /*
1131
         * Unregistration is speculative (because migration is single-threaded
1132
         * and we cannot break the protocol's inifinband message ordering).
1133
         * Thus, if the memory is currently being used for transmission,
1134
         * then abort the attempt to unregister and try again
1135
         * later the next time a completion is received for this memory.
1136
         */
1137
        clear_bit(chunk, block->unregister_bitmap);
1138

    
1139
        if (test_bit(chunk, block->transit_bitmap)) {
1140
            DDPRINTF("Cannot unregister inflight chunk: %" PRIu64 "\n", chunk);
1141
            continue;
1142
        }
1143

    
1144
        DDPRINTF("Sending unregister for chunk: %" PRIu64 "\n", chunk);
1145

    
1146
        ret = ibv_dereg_mr(block->pmr[chunk]);
1147
        block->pmr[chunk] = NULL;
1148
        block->remote_keys[chunk] = 0;
1149

    
1150
        if (ret != 0) {
1151
            perror("unregistration chunk failed");
1152
            return -ret;
1153
        }
1154
        rdma->total_registrations--;
1155

    
1156
        reg.key.chunk = chunk;
1157
        register_to_network(&reg);
1158
        ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1159
                                &resp, NULL, NULL);
1160
        if (ret < 0) {
1161
            return ret;
1162
        }
1163

    
1164
        DDPRINTF("Unregister for chunk: %" PRIu64 " complete.\n", chunk);
1165
    }
1166

    
1167
    return 0;
1168
}
1169

    
1170
static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1171
                                         uint64_t chunk)
1172
{
1173
    uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1174

    
1175
    result |= (index << RDMA_WRID_BLOCK_SHIFT);
1176
    result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1177

    
1178
    return result;
1179
}
1180

    
1181
/*
1182
 * Set bit for unregistration in the next iteration.
1183
 * We cannot transmit right here, but will unpin later.
1184
 */
1185
static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
1186
                                        uint64_t chunk, uint64_t wr_id)
1187
{
1188
    if (rdma->unregistrations[rdma->unregister_next] != 0) {
1189
        fprintf(stderr, "rdma migration: queue is full!\n");
1190
    } else {
1191
        RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1192

    
1193
        if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
1194
            DDPRINTF("Appending unregister chunk %" PRIu64
1195
                    " at position %d\n", chunk, rdma->unregister_next);
1196

    
1197
            rdma->unregistrations[rdma->unregister_next++] =
1198
                    qemu_rdma_make_wrid(wr_id, index, chunk);
1199

    
1200
            if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
1201
                rdma->unregister_next = 0;
1202
            }
1203
        } else {
1204
            DDPRINTF("Unregister chunk %" PRIu64 " already in queue.\n",
1205
                    chunk);
1206
        }
1207
    }
1208
}
1209

    
1210
/*
1211
 * Consult the connection manager to see a work request
1212
 * (of any kind) has completed.
1213
 * Return the work request ID that completed.
1214
 */
1215
static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out)
1216
{
1217
    int ret;
1218
    struct ibv_wc wc;
1219
    uint64_t wr_id;
1220

    
1221
    ret = ibv_poll_cq(rdma->cq, 1, &wc);
1222

    
1223
    if (!ret) {
1224
        *wr_id_out = RDMA_WRID_NONE;
1225
        return 0;
1226
    }
1227

    
1228
    if (ret < 0) {
1229
        fprintf(stderr, "ibv_poll_cq return %d!\n", ret);
1230
        return ret;
1231
    }
1232

    
1233
    wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1234

    
1235
    if (wc.status != IBV_WC_SUCCESS) {
1236
        fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
1237
                        wc.status, ibv_wc_status_str(wc.status));
1238
        fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
1239

    
1240
        return -1;
1241
    }
1242

    
1243
    if (rdma->control_ready_expected &&
1244
        (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1245
        DDDPRINTF("completion %s #%" PRId64 " received (%" PRId64 ")"
1246
                  " left %d\n", wrid_desc[RDMA_WRID_RECV_CONTROL],
1247
                  wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
1248
        rdma->control_ready_expected = 0;
1249
    }
1250

    
1251
    if (wr_id == RDMA_WRID_RDMA_WRITE) {
1252
        uint64_t chunk =
1253
            (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1254
        uint64_t index =
1255
            (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1256
        RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1257

    
1258
        DDDPRINTF("completions %s (%" PRId64 ") left %d, "
1259
                 "block %" PRIu64 ", chunk: %" PRIu64 " %p %p\n",
1260
                 print_wrid(wr_id), wr_id, rdma->nb_sent, index, chunk,
1261
                 block->local_host_addr, (void *)block->remote_host_addr);
1262

    
1263
        clear_bit(chunk, block->transit_bitmap);
1264

    
1265
        if (rdma->nb_sent > 0) {
1266
            rdma->nb_sent--;
1267
        }
1268

    
1269
        if (!rdma->pin_all) {
1270
            /*
1271
             * FYI: If one wanted to signal a specific chunk to be unregistered
1272
             * using LRU or workload-specific information, this is the function
1273
             * you would call to do so. That chunk would then get asynchronously
1274
             * unregistered later.
1275
             */
1276
#ifdef RDMA_UNREGISTRATION_EXAMPLE
1277
            qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
1278
#endif
1279
        }
1280
    } else {
1281
        DDDPRINTF("other completion %s (%" PRId64 ") received left %d\n",
1282
            print_wrid(wr_id), wr_id, rdma->nb_sent);
1283
    }
1284

    
1285
    *wr_id_out = wc.wr_id;
1286

    
1287
    return  0;
1288
}
1289

    
1290
/*
1291
 * Block until the next work request has completed.
1292
 *
1293
 * First poll to see if a work request has already completed,
1294
 * otherwise block.
1295
 *
1296
 * If we encounter completed work requests for IDs other than
1297
 * the one we're interested in, then that's generally an error.
1298
 *
1299
 * The only exception is actual RDMA Write completions. These
1300
 * completions only need to be recorded, but do not actually
1301
 * need further processing.
1302
 */
1303
static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested)
1304
{
1305
    int num_cq_events = 0, ret = 0;
1306
    struct ibv_cq *cq;
1307
    void *cq_ctx;
1308
    uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1309

    
1310
    if (ibv_req_notify_cq(rdma->cq, 0)) {
1311
        return -1;
1312
    }
1313
    /* poll cq first */
1314
    while (wr_id != wrid_requested) {
1315
        ret = qemu_rdma_poll(rdma, &wr_id_in);
1316
        if (ret < 0) {
1317
            return ret;
1318
        }
1319

    
1320
        wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1321

    
1322
        if (wr_id == RDMA_WRID_NONE) {
1323
            break;
1324
        }
1325
        if (wr_id != wrid_requested) {
1326
            DDDPRINTF("A Wanted wrid %s (%d) but got %s (%" PRIu64 ")\n",
1327
                print_wrid(wrid_requested),
1328
                wrid_requested, print_wrid(wr_id), wr_id);
1329
        }
1330
    }
1331

    
1332
    if (wr_id == wrid_requested) {
1333
        return 0;
1334
    }
1335

    
1336
    while (1) {
1337
        /*
1338
         * Coroutine doesn't start until process_incoming_migration()
1339
         * so don't yield unless we know we're running inside of a coroutine.
1340
         */
1341
        if (rdma->migration_started_on_destination) {
1342
            yield_until_fd_readable(rdma->comp_channel->fd);
1343
        }
1344

    
1345
        if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
1346
            perror("ibv_get_cq_event");
1347
            goto err_block_for_wrid;
1348
        }
1349

    
1350
        num_cq_events++;
1351

    
1352
        if (ibv_req_notify_cq(cq, 0)) {
1353
            goto err_block_for_wrid;
1354
        }
1355

    
1356
        while (wr_id != wrid_requested) {
1357
            ret = qemu_rdma_poll(rdma, &wr_id_in);
1358
            if (ret < 0) {
1359
                goto err_block_for_wrid;
1360
            }
1361

    
1362
            wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1363

    
1364
            if (wr_id == RDMA_WRID_NONE) {
1365
                break;
1366
            }
1367
            if (wr_id != wrid_requested) {
1368
                DDDPRINTF("B Wanted wrid %s (%d) but got %s (%" PRIu64 ")\n",
1369
                    print_wrid(wrid_requested), wrid_requested,
1370
                    print_wrid(wr_id), wr_id);
1371
            }
1372
        }
1373

    
1374
        if (wr_id == wrid_requested) {
1375
            goto success_block_for_wrid;
1376
        }
1377
    }
1378

    
1379
success_block_for_wrid:
1380
    if (num_cq_events) {
1381
        ibv_ack_cq_events(cq, num_cq_events);
1382
    }
1383
    return 0;
1384

    
1385
err_block_for_wrid:
1386
    if (num_cq_events) {
1387
        ibv_ack_cq_events(cq, num_cq_events);
1388
    }
1389
    return ret;
1390
}
1391

    
1392
/*
1393
 * Post a SEND message work request for the control channel
1394
 * containing some data and block until the post completes.
1395
 */
1396
static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1397
                                       RDMAControlHeader *head)
1398
{
1399
    int ret = 0;
1400
    RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_MAX];
1401
    struct ibv_send_wr *bad_wr;
1402
    struct ibv_sge sge = {
1403
                           .addr = (uint64_t)(wr->control),
1404
                           .length = head->len + sizeof(RDMAControlHeader),
1405
                           .lkey = wr->control_mr->lkey,
1406
                         };
1407
    struct ibv_send_wr send_wr = {
1408
                                   .wr_id = RDMA_WRID_SEND_CONTROL,
1409
                                   .opcode = IBV_WR_SEND,
1410
                                   .send_flags = IBV_SEND_SIGNALED,
1411
                                   .sg_list = &sge,
1412
                                   .num_sge = 1,
1413
                                };
1414

    
1415
    DDDPRINTF("CONTROL: sending %s..\n", control_desc[head->type]);
1416

    
1417
    /*
1418
     * We don't actually need to do a memcpy() in here if we used
1419
     * the "sge" properly, but since we're only sending control messages
1420
     * (not RAM in a performance-critical path), then its OK for now.
1421
     *
1422
     * The copy makes the RDMAControlHeader simpler to manipulate
1423
     * for the time being.
1424
     */
1425
    memcpy(wr->control, head, sizeof(RDMAControlHeader));
1426
    control_to_network((void *) wr->control);
1427

    
1428
    if (buf) {
1429
        memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1430
    }
1431

    
1432

    
1433
    if (ibv_post_send(rdma->qp, &send_wr, &bad_wr)) {
1434
        return -1;
1435
    }
1436

    
1437
    if (ret < 0) {
1438
        fprintf(stderr, "Failed to use post IB SEND for control!\n");
1439
        return ret;
1440
    }
1441

    
1442
    ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL);
1443
    if (ret < 0) {
1444
        fprintf(stderr, "rdma migration: send polling control error!\n");
1445
    }
1446

    
1447
    return ret;
1448
}
1449

    
1450
/*
1451
 * Post a RECV work request in anticipation of some future receipt
1452
 * of data on the control channel.
1453
 */
1454
static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
1455
{
1456
    struct ibv_recv_wr *bad_wr;
1457
    struct ibv_sge sge = {
1458
                            .addr = (uint64_t)(rdma->wr_data[idx].control),
1459
                            .length = RDMA_CONTROL_MAX_BUFFER,
1460
                            .lkey = rdma->wr_data[idx].control_mr->lkey,
1461
                         };
1462

    
1463
    struct ibv_recv_wr recv_wr = {
1464
                                    .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1465
                                    .sg_list = &sge,
1466
                                    .num_sge = 1,
1467
                                 };
1468

    
1469

    
1470
    if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1471
        return -1;
1472
    }
1473

    
1474
    return 0;
1475
}
1476

    
1477
/*
1478
 * Block and wait for a RECV control channel message to arrive.
1479
 */
1480
static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1481
                RDMAControlHeader *head, int expecting, int idx)
1482
{
1483
    int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx);
1484

    
1485
    if (ret < 0) {
1486
        fprintf(stderr, "rdma migration: recv polling control error!\n");
1487
        return ret;
1488
    }
1489

    
1490
    network_to_control((void *) rdma->wr_data[idx].control);
1491
    memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1492

    
1493
    DDDPRINTF("CONTROL: %s receiving...\n", control_desc[expecting]);
1494

    
1495
    if (expecting == RDMA_CONTROL_NONE) {
1496
        DDDPRINTF("Surprise: got %s (%d)\n",
1497
                  control_desc[head->type], head->type);
1498
    } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1499
        fprintf(stderr, "Was expecting a %s (%d) control message"
1500
                ", but got: %s (%d), length: %d\n",
1501
                control_desc[expecting], expecting,
1502
                control_desc[head->type], head->type, head->len);
1503
        return -EIO;
1504
    }
1505

    
1506
    return 0;
1507
}
1508

    
1509
/*
1510
 * When a RECV work request has completed, the work request's
1511
 * buffer is pointed at the header.
1512
 *
1513
 * This will advance the pointer to the data portion
1514
 * of the control message of the work request's buffer that
1515
 * was populated after the work request finished.
1516
 */
1517
static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1518
                                  RDMAControlHeader *head)
1519
{
1520
    rdma->wr_data[idx].control_len = head->len;
1521
    rdma->wr_data[idx].control_curr =
1522
        rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1523
}
1524

    
1525
/*
1526
 * This is an 'atomic' high-level operation to deliver a single, unified
1527
 * control-channel message.
1528
 *
1529
 * Additionally, if the user is expecting some kind of reply to this message,
1530
 * they can request a 'resp' response message be filled in by posting an
1531
 * additional work request on behalf of the user and waiting for an additional
1532
 * completion.
1533
 *
1534
 * The extra (optional) response is used during registration to us from having
1535
 * to perform an *additional* exchange of message just to provide a response by
1536
 * instead piggy-backing on the acknowledgement.
1537
 */
1538
static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1539
                                   uint8_t *data, RDMAControlHeader *resp,
1540
                                   int *resp_idx,
1541
                                   int (*callback)(RDMAContext *rdma))
1542
{
1543
    int ret = 0;
1544

    
1545
    /*
1546
     * Wait until the dest is ready before attempting to deliver the message
1547
     * by waiting for a READY message.
1548
     */
1549
    if (rdma->control_ready_expected) {
1550
        RDMAControlHeader resp;
1551
        ret = qemu_rdma_exchange_get_response(rdma,
1552
                                    &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
1553
        if (ret < 0) {
1554
            return ret;
1555
        }
1556
    }
1557

    
1558
    /*
1559
     * If the user is expecting a response, post a WR in anticipation of it.
1560
     */
1561
    if (resp) {
1562
        ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
1563
        if (ret) {
1564
            fprintf(stderr, "rdma migration: error posting"
1565
                    " extra control recv for anticipated result!");
1566
            return ret;
1567
        }
1568
    }
1569

    
1570
    /*
1571
     * Post a WR to replace the one we just consumed for the READY message.
1572
     */
1573
    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1574
    if (ret) {
1575
        fprintf(stderr, "rdma migration: error posting first control recv!");
1576
        return ret;
1577
    }
1578

    
1579
    /*
1580
     * Deliver the control message that was requested.
1581
     */
1582
    ret = qemu_rdma_post_send_control(rdma, data, head);
1583

    
1584
    if (ret < 0) {
1585
        fprintf(stderr, "Failed to send control buffer!\n");
1586
        return ret;
1587
    }
1588

    
1589
    /*
1590
     * If we're expecting a response, block and wait for it.
1591
     */
1592
    if (resp) {
1593
        if (callback) {
1594
            DDPRINTF("Issuing callback before receiving response...\n");
1595
            ret = callback(rdma);
1596
            if (ret < 0) {
1597
                return ret;
1598
            }
1599
        }
1600

    
1601
        DDPRINTF("Waiting for response %s\n", control_desc[resp->type]);
1602
        ret = qemu_rdma_exchange_get_response(rdma, resp,
1603
                                              resp->type, RDMA_WRID_DATA);
1604

    
1605
        if (ret < 0) {
1606
            return ret;
1607
        }
1608

    
1609
        qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1610
        if (resp_idx) {
1611
            *resp_idx = RDMA_WRID_DATA;
1612
        }
1613
        DDPRINTF("Response %s received.\n", control_desc[resp->type]);
1614
    }
1615

    
1616
    rdma->control_ready_expected = 1;
1617

    
1618
    return 0;
1619
}
1620

    
1621
/*
1622
 * This is an 'atomic' high-level operation to receive a single, unified
1623
 * control-channel message.
1624
 */
1625
static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1626
                                int expecting)
1627
{
1628
    RDMAControlHeader ready = {
1629
                                .len = 0,
1630
                                .type = RDMA_CONTROL_READY,
1631
                                .repeat = 1,
1632
                              };
1633
    int ret;
1634

    
1635
    /*
1636
     * Inform the source that we're ready to receive a message.
1637
     */
1638
    ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
1639

    
1640
    if (ret < 0) {
1641
        fprintf(stderr, "Failed to send control buffer!\n");
1642
        return ret;
1643
    }
1644

    
1645
    /*
1646
     * Block and wait for the message.
1647
     */
1648
    ret = qemu_rdma_exchange_get_response(rdma, head,
1649
                                          expecting, RDMA_WRID_READY);
1650

    
1651
    if (ret < 0) {
1652
        return ret;
1653
    }
1654

    
1655
    qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1656

    
1657
    /*
1658
     * Post a new RECV work request to replace the one we just consumed.
1659
     */
1660
    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1661
    if (ret) {
1662
        fprintf(stderr, "rdma migration: error posting second control recv!");
1663
        return ret;
1664
    }
1665

    
1666
    return 0;
1667
}
1668

    
1669
/*
1670
 * Write an actual chunk of memory using RDMA.
1671
 *
1672
 * If we're using dynamic registration on the dest-side, we have to
1673
 * send a registration command first.
1674
 */
1675
static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
1676
                               int current_index, uint64_t current_addr,
1677
                               uint64_t length)
1678
{
1679
    struct ibv_sge sge;
1680
    struct ibv_send_wr send_wr = { 0 };
1681
    struct ibv_send_wr *bad_wr;
1682
    int reg_result_idx, ret, count = 0;
1683
    uint64_t chunk, chunks;
1684
    uint8_t *chunk_start, *chunk_end;
1685
    RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
1686
    RDMARegister reg;
1687
    RDMARegisterResult *reg_result;
1688
    RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
1689
    RDMAControlHeader head = { .len = sizeof(RDMARegister),
1690
                               .type = RDMA_CONTROL_REGISTER_REQUEST,
1691
                               .repeat = 1,
1692
                             };
1693

    
1694
retry:
1695
    sge.addr = (uint64_t)(block->local_host_addr +
1696
                            (current_addr - block->offset));
1697
    sge.length = length;
1698

    
1699
    chunk = ram_chunk_index(block->local_host_addr, (uint8_t *) sge.addr);
1700
    chunk_start = ram_chunk_start(block, chunk);
1701

    
1702
    if (block->is_ram_block) {
1703
        chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
1704

    
1705
        if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1706
            chunks--;
1707
        }
1708
    } else {
1709
        chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
1710

    
1711
        if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1712
            chunks--;
1713
        }
1714
    }
1715

    
1716
    DDPRINTF("Writing %" PRIu64 " chunks, (%" PRIu64 " MB)\n",
1717
        chunks + 1, (chunks + 1) * (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
1718

    
1719
    chunk_end = ram_chunk_end(block, chunk + chunks);
1720

    
1721
    if (!rdma->pin_all) {
1722
#ifdef RDMA_UNREGISTRATION_EXAMPLE
1723
        qemu_rdma_unregister_waiting(rdma);
1724
#endif
1725
    }
1726

    
1727
    while (test_bit(chunk, block->transit_bitmap)) {
1728
        (void)count;
1729
        DDPRINTF("(%d) Not clobbering: block: %d chunk %" PRIu64
1730
                " current %" PRIu64 " len %" PRIu64 " %d %d\n",
1731
                count++, current_index, chunk,
1732
                sge.addr, length, rdma->nb_sent, block->nb_chunks);
1733

    
1734
        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
1735

    
1736
        if (ret < 0) {
1737
            fprintf(stderr, "Failed to Wait for previous write to complete "
1738
                    "block %d chunk %" PRIu64
1739
                    " current %" PRIu64 " len %" PRIu64 " %d\n",
1740
                    current_index, chunk, sge.addr, length, rdma->nb_sent);
1741
            return ret;
1742
        }
1743
    }
1744

    
1745
    if (!rdma->pin_all || !block->is_ram_block) {
1746
        if (!block->remote_keys[chunk]) {
1747
            /*
1748
             * This chunk has not yet been registered, so first check to see
1749
             * if the entire chunk is zero. If so, tell the other size to
1750
             * memset() + madvise() the entire chunk without RDMA.
1751
             */
1752

    
1753
            if (can_use_buffer_find_nonzero_offset((void *)sge.addr, length)
1754
                   && buffer_find_nonzero_offset((void *)sge.addr,
1755
                                                    length) == length) {
1756
                RDMACompress comp = {
1757
                                        .offset = current_addr,
1758
                                        .value = 0,
1759
                                        .block_idx = current_index,
1760
                                        .length = length,
1761
                                    };
1762

    
1763
                head.len = sizeof(comp);
1764
                head.type = RDMA_CONTROL_COMPRESS;
1765

    
1766
                DDPRINTF("Entire chunk is zero, sending compress: %"
1767
                    PRIu64 " for %d "
1768
                    "bytes, index: %d, offset: %" PRId64 "...\n",
1769
                    chunk, sge.length, current_index, current_addr);
1770

    
1771
                compress_to_network(&comp);
1772
                ret = qemu_rdma_exchange_send(rdma, &head,
1773
                                (uint8_t *) &comp, NULL, NULL, NULL);
1774

    
1775
                if (ret < 0) {
1776
                    return -EIO;
1777
                }
1778

    
1779
                acct_update_position(f, sge.length, true);
1780

    
1781
                return 1;
1782
            }
1783

    
1784
            /*
1785
             * Otherwise, tell other side to register.
1786
             */
1787
            reg.current_index = current_index;
1788
            if (block->is_ram_block) {
1789
                reg.key.current_addr = current_addr;
1790
            } else {
1791
                reg.key.chunk = chunk;
1792
            }
1793
            reg.chunks = chunks;
1794

    
1795
            DDPRINTF("Sending registration request chunk %" PRIu64 " for %d "
1796
                    "bytes, index: %d, offset: %" PRId64 "...\n",
1797
                    chunk, sge.length, current_index, current_addr);
1798

    
1799
            register_to_network(&reg);
1800
            ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1801
                                    &resp, &reg_result_idx, NULL);
1802
            if (ret < 0) {
1803
                return ret;
1804
            }
1805

    
1806
            /* try to overlap this single registration with the one we sent. */
1807
            if (qemu_rdma_register_and_get_keys(rdma, block,
1808
                                                (uint8_t *) sge.addr,
1809
                                                &sge.lkey, NULL, chunk,
1810
                                                chunk_start, chunk_end)) {
1811
                fprintf(stderr, "cannot get lkey!\n");
1812
                return -EINVAL;
1813
            }
1814

    
1815
            reg_result = (RDMARegisterResult *)
1816
                    rdma->wr_data[reg_result_idx].control_curr;
1817

    
1818
            network_to_result(reg_result);
1819

    
1820
            DDPRINTF("Received registration result:"
1821
                    " my key: %x their key %x, chunk %" PRIu64 "\n",
1822
                    block->remote_keys[chunk], reg_result->rkey, chunk);
1823

    
1824
            block->remote_keys[chunk] = reg_result->rkey;
1825
            block->remote_host_addr = reg_result->host_addr;
1826
        } else {
1827
            /* already registered before */
1828
            if (qemu_rdma_register_and_get_keys(rdma, block,
1829
                                                (uint8_t *)sge.addr,
1830
                                                &sge.lkey, NULL, chunk,
1831
                                                chunk_start, chunk_end)) {
1832
                fprintf(stderr, "cannot get lkey!\n");
1833
                return -EINVAL;
1834
            }
1835
        }
1836

    
1837
        send_wr.wr.rdma.rkey = block->remote_keys[chunk];
1838
    } else {
1839
        send_wr.wr.rdma.rkey = block->remote_rkey;
1840

    
1841
        if (qemu_rdma_register_and_get_keys(rdma, block, (uint8_t *)sge.addr,
1842
                                                     &sge.lkey, NULL, chunk,
1843
                                                     chunk_start, chunk_end)) {
1844
            fprintf(stderr, "cannot get lkey!\n");
1845
            return -EINVAL;
1846
        }
1847
    }
1848

    
1849
    /*
1850
     * Encode the ram block index and chunk within this wrid.
1851
     * We will use this information at the time of completion
1852
     * to figure out which bitmap to check against and then which
1853
     * chunk in the bitmap to look for.
1854
     */
1855
    send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
1856
                                        current_index, chunk);
1857

    
1858
    send_wr.opcode = IBV_WR_RDMA_WRITE;
1859
    send_wr.send_flags = IBV_SEND_SIGNALED;
1860
    send_wr.sg_list = &sge;
1861
    send_wr.num_sge = 1;
1862
    send_wr.wr.rdma.remote_addr = block->remote_host_addr +
1863
                                (current_addr - block->offset);
1864

    
1865
    DDDPRINTF("Posting chunk: %" PRIu64 ", addr: %lx"
1866
              " remote: %lx, bytes %" PRIu32 "\n",
1867
              chunk, sge.addr, send_wr.wr.rdma.remote_addr,
1868
              sge.length);
1869

    
1870
    /*
1871
     * ibv_post_send() does not return negative error numbers,
1872
     * per the specification they are positive - no idea why.
1873
     */
1874
    ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1875

    
1876
    if (ret == ENOMEM) {
1877
        DDPRINTF("send queue is full. wait a little....\n");
1878
        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
1879
        if (ret < 0) {
1880
            fprintf(stderr, "rdma migration: failed to make "
1881
                            "room in full send queue! %d\n", ret);
1882
            return ret;
1883
        }
1884

    
1885
        goto retry;
1886

    
1887
    } else if (ret > 0) {
1888
        perror("rdma migration: post rdma write failed");
1889
        return -ret;
1890
    }
1891

    
1892
    set_bit(chunk, block->transit_bitmap);
1893
    acct_update_position(f, sge.length, false);
1894
    rdma->total_writes++;
1895

    
1896
    return 0;
1897
}
1898

    
1899
/*
1900
 * Push out any unwritten RDMA operations.
1901
 *
1902
 * We support sending out multiple chunks at the same time.
1903
 * Not all of them need to get signaled in the completion queue.
1904
 */
1905
static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
1906
{
1907
    int ret;
1908

    
1909
    if (!rdma->current_length) {
1910
        return 0;
1911
    }
1912

    
1913
    ret = qemu_rdma_write_one(f, rdma,
1914
            rdma->current_index, rdma->current_addr, rdma->current_length);
1915

    
1916
    if (ret < 0) {
1917
        return ret;
1918
    }
1919

    
1920
    if (ret == 0) {
1921
        rdma->nb_sent++;
1922
        DDDPRINTF("sent total: %d\n", rdma->nb_sent);
1923
    }
1924

    
1925
    rdma->current_length = 0;
1926
    rdma->current_addr = 0;
1927

    
1928
    return 0;
1929
}
1930

    
1931
static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
1932
                    uint64_t offset, uint64_t len)
1933
{
1934
    RDMALocalBlock *block =
1935
        &(rdma->local_ram_blocks.block[rdma->current_index]);
1936
    uint8_t *host_addr = block->local_host_addr + (offset - block->offset);
1937
    uint8_t *chunk_end = ram_chunk_end(block, rdma->current_chunk);
1938

    
1939
    if (rdma->current_length == 0) {
1940
        return 0;
1941
    }
1942

    
1943
    /*
1944
     * Only merge into chunk sequentially.
1945
     */
1946
    if (offset != (rdma->current_addr + rdma->current_length)) {
1947
        return 0;
1948
    }
1949

    
1950
    if (rdma->current_index < 0) {
1951
        return 0;
1952
    }
1953

    
1954
    if (offset < block->offset) {
1955
        return 0;
1956
    }
1957

    
1958
    if ((offset + len) > (block->offset + block->length)) {
1959
        return 0;
1960
    }
1961

    
1962
    if (rdma->current_chunk < 0) {
1963
        return 0;
1964
    }
1965

    
1966
    if ((host_addr + len) > chunk_end) {
1967
        return 0;
1968
    }
1969

    
1970
    return 1;
1971
}
1972

    
1973
/*
1974
 * We're not actually writing here, but doing three things:
1975
 *
1976
 * 1. Identify the chunk the buffer belongs to.
1977
 * 2. If the chunk is full or the buffer doesn't belong to the current
1978
 *    chunk, then start a new chunk and flush() the old chunk.
1979
 * 3. To keep the hardware busy, we also group chunks into batches
1980
 *    and only require that a batch gets acknowledged in the completion
1981
 *    qeueue instead of each individual chunk.
1982
 */
1983
static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
1984
                           uint64_t block_offset, uint64_t offset,
1985
                           uint64_t len)
1986
{
1987
    uint64_t current_addr = block_offset + offset;
1988
    uint64_t index = rdma->current_index;
1989
    uint64_t chunk = rdma->current_chunk;
1990
    int ret;
1991

    
1992
    /* If we cannot merge it, we flush the current buffer first. */
1993
    if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
1994
        ret = qemu_rdma_write_flush(f, rdma);
1995
        if (ret) {
1996
            return ret;
1997
        }
1998
        rdma->current_length = 0;
1999
        rdma->current_addr = current_addr;
2000

    
2001
        ret = qemu_rdma_search_ram_block(rdma, block_offset,
2002
                                         offset, len, &index, &chunk);
2003
        if (ret) {
2004
            fprintf(stderr, "ram block search failed\n");
2005
            return ret;
2006
        }
2007
        rdma->current_index = index;
2008
        rdma->current_chunk = chunk;
2009
    }
2010

    
2011
    /* merge it */
2012
    rdma->current_length += len;
2013

    
2014
    /* flush it if buffer is too large */
2015
    if (rdma->current_length >= RDMA_MERGE_MAX) {
2016
        return qemu_rdma_write_flush(f, rdma);
2017
    }
2018

    
2019
    return 0;
2020
}
2021

    
2022
static void qemu_rdma_cleanup(RDMAContext *rdma)
2023
{
2024
    struct rdma_cm_event *cm_event;
2025
    int ret, idx;
2026

    
2027
    if (rdma->cm_id) {
2028
        if (rdma->error_state) {
2029
            RDMAControlHeader head = { .len = 0,
2030
                                       .type = RDMA_CONTROL_ERROR,
2031
                                       .repeat = 1,
2032
                                     };
2033
            fprintf(stderr, "Early error. Sending error.\n");
2034
            qemu_rdma_post_send_control(rdma, NULL, &head);
2035
        }
2036

    
2037
        ret = rdma_disconnect(rdma->cm_id);
2038
        if (!ret) {
2039
            DDPRINTF("waiting for disconnect\n");
2040
            ret = rdma_get_cm_event(rdma->channel, &cm_event);
2041
            if (!ret) {
2042
                rdma_ack_cm_event(cm_event);
2043
            }
2044
        }
2045
        DDPRINTF("Disconnected.\n");
2046
        rdma->cm_id = NULL;
2047
    }
2048

    
2049
    g_free(rdma->block);
2050
    rdma->block = NULL;
2051

    
2052
    for (idx = 0; idx <= RDMA_WRID_MAX; idx++) {
2053
        if (rdma->wr_data[idx].control_mr) {
2054
            rdma->total_registrations--;
2055
            ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2056
        }
2057
        rdma->wr_data[idx].control_mr = NULL;
2058
    }
2059

    
2060
    if (rdma->local_ram_blocks.block) {
2061
        while (rdma->local_ram_blocks.nb_blocks) {
2062
            __qemu_rdma_delete_block(rdma,
2063
                    rdma->local_ram_blocks.block->offset);
2064
        }
2065
    }
2066

    
2067
    if (rdma->qp) {
2068
        ibv_destroy_qp(rdma->qp);
2069
        rdma->qp = NULL;
2070
    }
2071
    if (rdma->cq) {
2072
        ibv_destroy_cq(rdma->cq);
2073
        rdma->cq = NULL;
2074
    }
2075
    if (rdma->comp_channel) {
2076
        ibv_destroy_comp_channel(rdma->comp_channel);
2077
        rdma->comp_channel = NULL;
2078
    }
2079
    if (rdma->pd) {
2080
        ibv_dealloc_pd(rdma->pd);
2081
        rdma->pd = NULL;
2082
    }
2083
    if (rdma->listen_id) {
2084
        rdma_destroy_id(rdma->listen_id);
2085
        rdma->listen_id = NULL;
2086
    }
2087
    if (rdma->cm_id) {
2088
        rdma_destroy_id(rdma->cm_id);
2089
        rdma->cm_id = NULL;
2090
    }
2091
    if (rdma->channel) {
2092
        rdma_destroy_event_channel(rdma->channel);
2093
        rdma->channel = NULL;
2094
    }
2095
}
2096

    
2097

    
2098
static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all)
2099
{
2100
    int ret, idx;
2101
    Error *local_err = NULL, **temp = &local_err;
2102

    
2103
    /*
2104
     * Will be validated against destination's actual capabilities
2105
     * after the connect() completes.
2106
     */
2107
    rdma->pin_all = pin_all;
2108

    
2109
    ret = qemu_rdma_resolve_host(rdma, temp);
2110
    if (ret) {
2111
        goto err_rdma_source_init;
2112
    }
2113

    
2114
    ret = qemu_rdma_alloc_pd_cq(rdma);
2115
    if (ret) {
2116
        ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
2117
                    " limits may be too low. Please check $ ulimit -a # and "
2118
                    "search for 'ulimit -l' in the output\n");
2119
        goto err_rdma_source_init;
2120
    }
2121

    
2122
    ret = qemu_rdma_alloc_qp(rdma);
2123
    if (ret) {
2124
        ERROR(temp, "rdma migration: error allocating qp!\n");
2125
        goto err_rdma_source_init;
2126
    }
2127

    
2128
    ret = qemu_rdma_init_ram_blocks(rdma);
2129
    if (ret) {
2130
        ERROR(temp, "rdma migration: error initializing ram blocks!\n");
2131
        goto err_rdma_source_init;
2132
    }
2133

    
2134
    for (idx = 0; idx <= RDMA_WRID_MAX; idx++) {
2135
        ret = qemu_rdma_reg_control(rdma, idx);
2136
        if (ret) {
2137
            ERROR(temp, "rdma migration: error registering %d control!\n",
2138
                                                            idx);
2139
            goto err_rdma_source_init;
2140
        }
2141
    }
2142

    
2143
    return 0;
2144

    
2145
err_rdma_source_init:
2146
    error_propagate(errp, local_err);
2147
    qemu_rdma_cleanup(rdma);
2148
    return -1;
2149
}
2150

    
2151
static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
2152
{
2153
    RDMACapabilities cap = {
2154
                                .version = RDMA_CONTROL_VERSION_CURRENT,
2155
                                .flags = 0,
2156
                           };
2157
    struct rdma_conn_param conn_param = { .initiator_depth = 2,
2158
                                          .retry_count = 5,
2159
                                          .private_data = &cap,
2160
                                          .private_data_len = sizeof(cap),
2161
                                        };
2162
    struct rdma_cm_event *cm_event;
2163
    int ret;
2164

    
2165
    /*
2166
     * Only negotiate the capability with destination if the user
2167
     * on the source first requested the capability.
2168
     */
2169
    if (rdma->pin_all) {
2170
        DPRINTF("Server pin-all memory requested.\n");
2171
        cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2172
    }
2173

    
2174
    caps_to_network(&cap);
2175

    
2176
    ret = rdma_connect(rdma->cm_id, &conn_param);
2177
    if (ret) {
2178
        perror("rdma_connect");
2179
        ERROR(errp, "connecting to destination!\n");
2180
        rdma_destroy_id(rdma->cm_id);
2181
        rdma->cm_id = NULL;
2182
        goto err_rdma_source_connect;
2183
    }
2184

    
2185
    ret = rdma_get_cm_event(rdma->channel, &cm_event);
2186
    if (ret) {
2187
        perror("rdma_get_cm_event after rdma_connect");
2188
        ERROR(errp, "connecting to destination!\n");
2189
        rdma_ack_cm_event(cm_event);
2190
        rdma_destroy_id(rdma->cm_id);
2191
        rdma->cm_id = NULL;
2192
        goto err_rdma_source_connect;
2193
    }
2194

    
2195
    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2196
        perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
2197
        ERROR(errp, "connecting to destination!\n");
2198
        rdma_ack_cm_event(cm_event);
2199
        rdma_destroy_id(rdma->cm_id);
2200
        rdma->cm_id = NULL;
2201
        goto err_rdma_source_connect;
2202
    }
2203

    
2204
    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2205
    network_to_caps(&cap);
2206

    
2207
    /*
2208
     * Verify that the *requested* capabilities are supported by the destination
2209
     * and disable them otherwise.
2210
     */
2211
    if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2212
        ERROR(errp, "Server cannot support pinning all memory. "
2213
                        "Will register memory dynamically.\n");
2214
        rdma->pin_all = false;
2215
    }
2216

    
2217
    DPRINTF("Pin all memory: %s\n", rdma->pin_all ? "enabled" : "disabled");
2218

    
2219
    rdma_ack_cm_event(cm_event);
2220

    
2221
    ret = qemu_rdma_post_recv_control(rdma, 0);
2222
    if (ret) {
2223
        ERROR(errp, "posting second control recv!\n");
2224
        goto err_rdma_source_connect;
2225
    }
2226

    
2227
    rdma->control_ready_expected = 1;
2228
    rdma->nb_sent = 0;
2229
    return 0;
2230

    
2231
err_rdma_source_connect:
2232
    qemu_rdma_cleanup(rdma);
2233
    return -1;
2234
}
2235

    
2236
static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2237
{
2238
    int ret = -EINVAL, idx;
2239
    struct sockaddr_in sin;
2240
    struct rdma_cm_id *listen_id;
2241
    char ip[40] = "unknown";
2242

    
2243
    for (idx = 0; idx <= RDMA_WRID_MAX; idx++) {
2244
        rdma->wr_data[idx].control_len = 0;
2245
        rdma->wr_data[idx].control_curr = NULL;
2246
    }
2247

    
2248
    if (rdma->host == NULL) {
2249
        ERROR(errp, "RDMA host is not set!\n");
2250
        rdma->error_state = -EINVAL;
2251
        return -1;
2252
    }
2253
    /* create CM channel */
2254
    rdma->channel = rdma_create_event_channel();
2255
    if (!rdma->channel) {
2256
        ERROR(errp, "could not create rdma event channel\n");
2257
        rdma->error_state = -EINVAL;
2258
        return -1;
2259
    }
2260

    
2261
    /* create CM id */
2262
    ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2263
    if (ret) {
2264
        ERROR(errp, "could not create cm_id!\n");
2265
        goto err_dest_init_create_listen_id;
2266
    }
2267

    
2268
    memset(&sin, 0, sizeof(sin));
2269
    sin.sin_family = AF_INET;
2270
    sin.sin_port = htons(rdma->port);
2271

    
2272
    if (rdma->host && strcmp("", rdma->host)) {
2273
        struct hostent *dest_addr;
2274
        dest_addr = gethostbyname(rdma->host);
2275
        if (!dest_addr) {
2276
            ERROR(errp, "migration could not gethostbyname!\n");
2277
            ret = -EINVAL;
2278
            goto err_dest_init_bind_addr;
2279
        }
2280
        memcpy(&sin.sin_addr.s_addr, dest_addr->h_addr,
2281
                dest_addr->h_length);
2282
        inet_ntop(AF_INET, dest_addr->h_addr, ip, sizeof ip);
2283
    } else {
2284
        sin.sin_addr.s_addr = INADDR_ANY;
2285
    }
2286

    
2287
    DPRINTF("%s => %s\n", rdma->host, ip);
2288

    
2289
    ret = rdma_bind_addr(listen_id, (struct sockaddr *)&sin);
2290
    if (ret) {
2291
        ERROR(errp, "Error: could not rdma_bind_addr!\n");
2292
        goto err_dest_init_bind_addr;
2293
    }
2294

    
2295
    rdma->listen_id = listen_id;
2296
    qemu_rdma_dump_gid("dest_init", listen_id);
2297
    return 0;
2298

    
2299
err_dest_init_bind_addr:
2300
    rdma_destroy_id(listen_id);
2301
err_dest_init_create_listen_id:
2302
    rdma_destroy_event_channel(rdma->channel);
2303
    rdma->channel = NULL;
2304
    rdma->error_state = ret;
2305
    return ret;
2306

    
2307
}
2308

    
2309
static void *qemu_rdma_data_init(const char *host_port, Error **errp)
2310
{
2311
    RDMAContext *rdma = NULL;
2312
    InetSocketAddress *addr;
2313

    
2314
    if (host_port) {
2315
        rdma = g_malloc0(sizeof(RDMAContext));
2316
        memset(rdma, 0, sizeof(RDMAContext));
2317
        rdma->current_index = -1;
2318
        rdma->current_chunk = -1;
2319

    
2320
        addr = inet_parse(host_port, NULL);
2321
        if (addr != NULL) {
2322
            rdma->port = atoi(addr->port);
2323
            rdma->host = g_strdup(addr->host);
2324
        } else {
2325
            ERROR(errp, "bad RDMA migration address '%s'", host_port);
2326
            g_free(rdma);
2327
            return NULL;
2328
        }
2329
    }
2330

    
2331
    return rdma;
2332
}
2333

    
2334
/*
2335
 * QEMUFile interface to the control channel.
2336
 * SEND messages for control only.
2337
 * pc.ram is handled with regular RDMA messages.
2338
 */
2339
static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
2340
                                int64_t pos, int size)
2341
{
2342
    QEMUFileRDMA *r = opaque;
2343
    QEMUFile *f = r->file;
2344
    RDMAContext *rdma = r->rdma;
2345
    size_t remaining = size;
2346
    uint8_t * data = (void *) buf;
2347
    int ret;
2348

    
2349
    CHECK_ERROR_STATE();
2350

    
2351
    /*
2352
     * Push out any writes that
2353
     * we're queued up for pc.ram.
2354
     */
2355
    ret = qemu_rdma_write_flush(f, rdma);
2356
    if (ret < 0) {
2357
        rdma->error_state = ret;
2358
        return ret;
2359
    }
2360

    
2361
    while (remaining) {
2362
        RDMAControlHeader head;
2363

    
2364
        r->len = MIN(remaining, RDMA_SEND_INCREMENT);
2365
        remaining -= r->len;
2366

    
2367
        head.len = r->len;
2368
        head.type = RDMA_CONTROL_QEMU_FILE;
2369

    
2370
        ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
2371

    
2372
        if (ret < 0) {
2373
            rdma->error_state = ret;
2374
            return ret;
2375
        }
2376

    
2377
        data += r->len;
2378
    }
2379

    
2380
    return size;
2381
}
2382

    
2383
static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2384
                             int size, int idx)
2385
{
2386
    size_t len = 0;
2387

    
2388
    if (rdma->wr_data[idx].control_len) {
2389
        DDDPRINTF("RDMA %" PRId64 " of %d bytes already in buffer\n",
2390
                    rdma->wr_data[idx].control_len, size);
2391

    
2392
        len = MIN(size, rdma->wr_data[idx].control_len);
2393
        memcpy(buf, rdma->wr_data[idx].control_curr, len);
2394
        rdma->wr_data[idx].control_curr += len;
2395
        rdma->wr_data[idx].control_len -= len;
2396
    }
2397

    
2398
    return len;
2399
}
2400

    
2401
/*
2402
 * QEMUFile interface to the control channel.
2403
 * RDMA links don't use bytestreams, so we have to
2404
 * return bytes to QEMUFile opportunistically.
2405
 */
2406
static int qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
2407
                                int64_t pos, int size)
2408
{
2409
    QEMUFileRDMA *r = opaque;
2410
    RDMAContext *rdma = r->rdma;
2411
    RDMAControlHeader head;
2412
    int ret = 0;
2413

    
2414
    CHECK_ERROR_STATE();
2415

    
2416
    /*
2417
     * First, we hold on to the last SEND message we
2418
     * were given and dish out the bytes until we run
2419
     * out of bytes.
2420
     */
2421
    r->len = qemu_rdma_fill(r->rdma, buf, size, 0);
2422
    if (r->len) {
2423
        return r->len;
2424
    }
2425

    
2426
    /*
2427
     * Once we run out, we block and wait for another
2428
     * SEND message to arrive.
2429
     */
2430
    ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
2431

    
2432
    if (ret < 0) {
2433
        rdma->error_state = ret;
2434
        return ret;
2435
    }
2436

    
2437
    /*
2438
     * SEND was received with new bytes, now try again.
2439
     */
2440
    return qemu_rdma_fill(r->rdma, buf, size, 0);
2441
}
2442

    
2443
/*
2444
 * Block until all the outstanding chunks have been delivered by the hardware.
2445
 */
2446
static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
2447
{
2448
    int ret;
2449

    
2450
    if (qemu_rdma_write_flush(f, rdma) < 0) {
2451
        return -EIO;
2452
    }
2453

    
2454
    while (rdma->nb_sent) {
2455
        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
2456
        if (ret < 0) {
2457
            fprintf(stderr, "rdma migration: complete polling error!\n");
2458
            return -EIO;
2459
        }
2460
    }
2461

    
2462
    qemu_rdma_unregister_waiting(rdma);
2463

    
2464
    return 0;
2465
}
2466

    
2467
static int qemu_rdma_close(void *opaque)
2468
{
2469
    DPRINTF("Shutting down connection.\n");
2470
    QEMUFileRDMA *r = opaque;
2471
    if (r->rdma) {
2472
        qemu_rdma_cleanup(r->rdma);
2473
        g_free(r->rdma);
2474
    }
2475
    g_free(r);
2476
    return 0;
2477
}
2478

    
2479
/*
2480
 * Parameters:
2481
 *    @offset == 0 :
2482
 *        This means that 'block_offset' is a full virtual address that does not
2483
 *        belong to a RAMBlock of the virtual machine and instead
2484
 *        represents a private malloc'd memory area that the caller wishes to
2485
 *        transfer.
2486
 *
2487
 *    @offset != 0 :
2488
 *        Offset is an offset to be added to block_offset and used
2489
 *        to also lookup the corresponding RAMBlock.
2490
 *
2491
 *    @size > 0 :
2492
 *        Initiate an transfer this size.
2493
 *
2494
 *    @size == 0 :
2495
 *        A 'hint' or 'advice' that means that we wish to speculatively
2496
 *        and asynchronously unregister this memory. In this case, there is no
2497
 *        guarantee that the unregister will actually happen, for example,
2498
 *        if the memory is being actively transmitted. Additionally, the memory
2499
 *        may be re-registered at any future time if a write within the same
2500
 *        chunk was requested again, even if you attempted to unregister it
2501
 *        here.
2502
 *
2503
 *    @size < 0 : TODO, not yet supported
2504
 *        Unregister the memory NOW. This means that the caller does not
2505
 *        expect there to be any future RDMA transfers and we just want to clean
2506
 *        things up. This is used in case the upper layer owns the memory and
2507
 *        cannot wait for qemu_fclose() to occur.
2508
 *
2509
 *    @bytes_sent : User-specificed pointer to indicate how many bytes were
2510
 *                  sent. Usually, this will not be more than a few bytes of
2511
 *                  the protocol because most transfers are sent asynchronously.
2512
 */
2513
static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
2514
                                  ram_addr_t block_offset, ram_addr_t offset,
2515
                                  size_t size, int *bytes_sent)
2516
{
2517
    QEMUFileRDMA *rfile = opaque;
2518
    RDMAContext *rdma = rfile->rdma;
2519
    int ret;
2520

    
2521
    CHECK_ERROR_STATE();
2522

    
2523
    qemu_fflush(f);
2524

    
2525
    if (size > 0) {
2526
        /*
2527
         * Add this page to the current 'chunk'. If the chunk
2528
         * is full, or the page doen't belong to the current chunk,
2529
         * an actual RDMA write will occur and a new chunk will be formed.
2530
         */
2531
        ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
2532
        if (ret < 0) {
2533
            fprintf(stderr, "rdma migration: write error! %d\n", ret);
2534
            goto err;
2535
        }
2536

    
2537
        /*
2538
         * We always return 1 bytes because the RDMA
2539
         * protocol is completely asynchronous. We do not yet know
2540
         * whether an  identified chunk is zero or not because we're
2541
         * waiting for other pages to potentially be merged with
2542
         * the current chunk. So, we have to call qemu_update_position()
2543
         * later on when the actual write occurs.
2544
         */
2545
        if (bytes_sent) {
2546
            *bytes_sent = 1;
2547
        }
2548
    } else {
2549
        uint64_t index, chunk;
2550

    
2551
        /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
2552
        if (size < 0) {
2553
            ret = qemu_rdma_drain_cq(f, rdma);
2554
            if (ret < 0) {
2555
                fprintf(stderr, "rdma: failed to synchronously drain"
2556
                                " completion queue before unregistration.\n");
2557
                goto err;
2558
            }
2559
        }
2560
        */
2561

    
2562
        ret = qemu_rdma_search_ram_block(rdma, block_offset,
2563
                                         offset, size, &index, &chunk);
2564

    
2565
        if (ret) {
2566
            fprintf(stderr, "ram block search failed\n");
2567
            goto err;
2568
        }
2569

    
2570
        qemu_rdma_signal_unregister(rdma, index, chunk, 0);
2571

    
2572
        /*
2573
         * TODO: Synchronous, guaranteed unregistration (should not occur during
2574
         * fast-path). Otherwise, unregisters will process on the next call to
2575
         * qemu_rdma_drain_cq()
2576
        if (size < 0) {
2577
            qemu_rdma_unregister_waiting(rdma);
2578
        }
2579
        */
2580
    }
2581

    
2582
    /*
2583
     * Drain the Completion Queue if possible, but do not block,
2584
     * just poll.
2585
     *
2586
     * If nothing to poll, the end of the iteration will do this
2587
     * again to make sure we don't overflow the request queue.
2588
     */
2589
    while (1) {
2590
        uint64_t wr_id, wr_id_in;
2591
        int ret = qemu_rdma_poll(rdma, &wr_id_in);
2592
        if (ret < 0) {
2593
            fprintf(stderr, "rdma migration: polling error! %d\n", ret);
2594
            goto err;
2595
        }
2596

    
2597
        wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
2598

    
2599
        if (wr_id == RDMA_WRID_NONE) {
2600
            break;
2601
        }
2602
    }
2603

    
2604
    return RAM_SAVE_CONTROL_DELAYED;
2605
err:
2606
    rdma->error_state = ret;
2607
    return ret;
2608
}
2609

    
2610
static int qemu_rdma_accept(RDMAContext *rdma)
2611
{
2612
    RDMACapabilities cap;
2613
    struct rdma_conn_param conn_param = {
2614
                                            .responder_resources = 2,
2615
                                            .private_data = &cap,
2616
                                            .private_data_len = sizeof(cap),
2617
                                         };
2618
    struct rdma_cm_event *cm_event;
2619
    struct ibv_context *verbs;
2620
    int ret = -EINVAL;
2621
    int idx;
2622

    
2623
    ret = rdma_get_cm_event(rdma->channel, &cm_event);
2624
    if (ret) {
2625
        goto err_rdma_dest_wait;
2626
    }
2627

    
2628
    if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
2629
        rdma_ack_cm_event(cm_event);
2630
        goto err_rdma_dest_wait;
2631
    }
2632

    
2633
    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2634

    
2635
    network_to_caps(&cap);
2636

    
2637
    if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
2638
            fprintf(stderr, "Unknown source RDMA version: %d, bailing...\n",
2639
                            cap.version);
2640
            rdma_ack_cm_event(cm_event);
2641
            goto err_rdma_dest_wait;
2642
    }
2643

    
2644
    /*
2645
     * Respond with only the capabilities this version of QEMU knows about.
2646
     */
2647
    cap.flags &= known_capabilities;
2648

    
2649
    /*
2650
     * Enable the ones that we do know about.
2651
     * Add other checks here as new ones are introduced.
2652
     */
2653
    if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
2654
        rdma->pin_all = true;
2655
    }
2656

    
2657
    rdma->cm_id = cm_event->id;
2658
    verbs = cm_event->id->verbs;
2659

    
2660
    rdma_ack_cm_event(cm_event);
2661

    
2662
    DPRINTF("Memory pin all: %s\n", rdma->pin_all ? "enabled" : "disabled");
2663

    
2664
    caps_to_network(&cap);
2665

    
2666
    DPRINTF("verbs context after listen: %p\n", verbs);
2667

    
2668
    if (!rdma->verbs) {
2669
        rdma->verbs = verbs;
2670
    } else if (rdma->verbs != verbs) {
2671
            fprintf(stderr, "ibv context not matching %p, %p!\n",
2672
                    rdma->verbs, verbs);
2673
            goto err_rdma_dest_wait;
2674
    }
2675

    
2676
    qemu_rdma_dump_id("dest_init", verbs);
2677

    
2678
    ret = qemu_rdma_alloc_pd_cq(rdma);
2679
    if (ret) {
2680
        fprintf(stderr, "rdma migration: error allocating pd and cq!\n");
2681
        goto err_rdma_dest_wait;
2682
    }
2683

    
2684
    ret = qemu_rdma_alloc_qp(rdma);
2685
    if (ret) {
2686
        fprintf(stderr, "rdma migration: error allocating qp!\n");
2687
        goto err_rdma_dest_wait;
2688
    }
2689

    
2690
    ret = qemu_rdma_init_ram_blocks(rdma);
2691
    if (ret) {
2692
        fprintf(stderr, "rdma migration: error initializing ram blocks!\n");
2693
        goto err_rdma_dest_wait;
2694
    }
2695

    
2696
    for (idx = 0; idx <= RDMA_WRID_MAX; idx++) {
2697
        ret = qemu_rdma_reg_control(rdma, idx);
2698
        if (ret) {
2699
            fprintf(stderr, "rdma: error registering %d control!\n", idx);
2700
            goto err_rdma_dest_wait;
2701
        }
2702
    }
2703

    
2704
    qemu_set_fd_handler2(rdma->channel->fd, NULL, NULL, NULL, NULL);
2705

    
2706
    ret = rdma_accept(rdma->cm_id, &conn_param);
2707
    if (ret) {
2708
        fprintf(stderr, "rdma_accept returns %d!\n", ret);
2709
        goto err_rdma_dest_wait;
2710
    }
2711

    
2712
    ret = rdma_get_cm_event(rdma->channel, &cm_event);
2713
    if (ret) {
2714
        fprintf(stderr, "rdma_accept get_cm_event failed %d!\n", ret);
2715
        goto err_rdma_dest_wait;
2716
    }
2717

    
2718
    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2719
        fprintf(stderr, "rdma_accept not event established!\n");
2720
        rdma_ack_cm_event(cm_event);
2721
        goto err_rdma_dest_wait;
2722
    }
2723

    
2724
    rdma_ack_cm_event(cm_event);
2725

    
2726
    ret = qemu_rdma_post_recv_control(rdma, 0);
2727
    if (ret) {
2728
        fprintf(stderr, "rdma migration: error posting second control recv!\n");
2729
        goto err_rdma_dest_wait;
2730
    }
2731

    
2732
    qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
2733

    
2734
    return 0;
2735

    
2736
err_rdma_dest_wait:
2737
    rdma->error_state = ret;
2738
    qemu_rdma_cleanup(rdma);
2739
    return ret;
2740
}
2741

    
2742
/*
2743
 * During each iteration of the migration, we listen for instructions
2744
 * by the source VM to perform dynamic page registrations before they
2745
 * can perform RDMA operations.
2746
 *
2747
 * We respond with the 'rkey'.
2748
 *
2749
 * Keep doing this until the source tells us to stop.
2750
 */
2751
static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
2752
                                         uint64_t flags)
2753
{
2754
    RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
2755
                               .type = RDMA_CONTROL_REGISTER_RESULT,
2756
                               .repeat = 0,
2757
                             };
2758
    RDMAControlHeader unreg_resp = { .len = 0,
2759
                               .type = RDMA_CONTROL_UNREGISTER_FINISHED,
2760
                               .repeat = 0,
2761
                             };
2762
    RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
2763
                                 .repeat = 1 };
2764
    QEMUFileRDMA *rfile = opaque;
2765
    RDMAContext *rdma = rfile->rdma;
2766
    RDMALocalBlocks *local = &rdma->local_ram_blocks;
2767
    RDMAControlHeader head;
2768
    RDMARegister *reg, *registers;
2769
    RDMACompress *comp;
2770
    RDMARegisterResult *reg_result;
2771
    static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
2772
    RDMALocalBlock *block;
2773
    void *host_addr;
2774
    int ret = 0;
2775
    int idx = 0;
2776
    int count = 0;
2777
    int i = 0;
2778

    
2779
    CHECK_ERROR_STATE();
2780

    
2781
    do {
2782
        DDDPRINTF("Waiting for next request %" PRIu64 "...\n", flags);
2783

    
2784
        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
2785

    
2786
        if (ret < 0) {
2787
            break;
2788
        }
2789

    
2790
        if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
2791
            fprintf(stderr, "rdma: Too many requests in this message (%d)."
2792
                            "Bailing.\n", head.repeat);
2793
            ret = -EIO;
2794
            break;
2795
        }
2796

    
2797
        switch (head.type) {
2798
        case RDMA_CONTROL_COMPRESS:
2799
            comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
2800
            network_to_compress(comp);
2801

    
2802
            DDPRINTF("Zapping zero chunk: %" PRId64
2803
                    " bytes, index %d, offset %" PRId64 "\n",
2804
                    comp->length, comp->block_idx, comp->offset);
2805
            block = &(rdma->local_ram_blocks.block[comp->block_idx]);
2806

    
2807
            host_addr = block->local_host_addr +
2808
                            (comp->offset - block->offset);
2809

    
2810
            ram_handle_compressed(host_addr, comp->value, comp->length);
2811
            break;
2812

    
2813
        case RDMA_CONTROL_REGISTER_FINISHED:
2814
            DDDPRINTF("Current registrations complete.\n");
2815
            goto out;
2816

    
2817
        case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
2818
            DPRINTF("Initial setup info requested.\n");
2819

    
2820
            if (rdma->pin_all) {
2821
                ret = qemu_rdma_reg_whole_ram_blocks(rdma);
2822
                if (ret) {
2823
                    fprintf(stderr, "rdma migration: error dest "
2824
                                    "registering ram blocks!\n");
2825
                    goto out;
2826
                }
2827
            }
2828

    
2829
            /*
2830
             * Dest uses this to prepare to transmit the RAMBlock descriptions
2831
             * to the source VM after connection setup.
2832
             * Both sides use the "remote" structure to communicate and update
2833
             * their "local" descriptions with what was sent.
2834
             */
2835
            for (i = 0; i < local->nb_blocks; i++) {
2836
                rdma->block[i].remote_host_addr =
2837
                    (uint64_t)(local->block[i].local_host_addr);
2838

    
2839
                if (rdma->pin_all) {
2840
                    rdma->block[i].remote_rkey = local->block[i].mr->rkey;
2841
                }
2842

    
2843
                rdma->block[i].offset = local->block[i].offset;
2844
                rdma->block[i].length = local->block[i].length;
2845

    
2846
                remote_block_to_network(&rdma->block[i]);
2847
            }
2848

    
2849
            blocks.len = rdma->local_ram_blocks.nb_blocks
2850
                                                * sizeof(RDMARemoteBlock);
2851

    
2852

    
2853
            ret = qemu_rdma_post_send_control(rdma,
2854
                                        (uint8_t *) rdma->block, &blocks);
2855

    
2856
            if (ret < 0) {
2857
                fprintf(stderr, "rdma migration: error sending remote info!\n");
2858
                goto out;
2859
            }
2860

    
2861
            break;
2862
        case RDMA_CONTROL_REGISTER_REQUEST:
2863
            DDPRINTF("There are %d registration requests\n", head.repeat);
2864

    
2865
            reg_resp.repeat = head.repeat;
2866
            registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
2867

    
2868
            for (count = 0; count < head.repeat; count++) {
2869
                uint64_t chunk;
2870
                uint8_t *chunk_start, *chunk_end;
2871

    
2872
                reg = &registers[count];
2873
                network_to_register(reg);
2874

    
2875
                reg_result = &results[count];
2876

    
2877
                DDPRINTF("Registration request (%d): index %d, current_addr %"
2878
                         PRIu64 " chunks: %" PRIu64 "\n", count,
2879
                         reg->current_index, reg->key.current_addr, reg->chunks);
2880

    
2881
                block = &(rdma->local_ram_blocks.block[reg->current_index]);
2882
                if (block->is_ram_block) {
2883
                    host_addr = (block->local_host_addr +
2884
                                (reg->key.current_addr - block->offset));
2885
                    chunk = ram_chunk_index(block->local_host_addr,
2886
                                            (uint8_t *) host_addr);
2887
                } else {
2888
                    chunk = reg->key.chunk;
2889
                    host_addr = block->local_host_addr +
2890
                        (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
2891
                }
2892
                chunk_start = ram_chunk_start(block, chunk);
2893
                chunk_end = ram_chunk_end(block, chunk + reg->chunks);
2894
                if (qemu_rdma_register_and_get_keys(rdma, block,
2895
                            (uint8_t *)host_addr, NULL, &reg_result->rkey,
2896
                            chunk, chunk_start, chunk_end)) {
2897
                    fprintf(stderr, "cannot get rkey!\n");
2898
                    ret = -EINVAL;
2899
                    goto out;
2900
                }
2901

    
2902
                reg_result->host_addr = (uint64_t) block->local_host_addr;
2903

    
2904
                DDPRINTF("Registered rkey for this request: %x\n",
2905
                                reg_result->rkey);
2906

    
2907
                result_to_network(reg_result);
2908
            }
2909

    
2910
            ret = qemu_rdma_post_send_control(rdma,
2911
                            (uint8_t *) results, &reg_resp);
2912

    
2913
            if (ret < 0) {
2914
                fprintf(stderr, "Failed to send control buffer!\n");
2915
                goto out;
2916
            }
2917
            break;
2918
        case RDMA_CONTROL_UNREGISTER_REQUEST:
2919
            DDPRINTF("There are %d unregistration requests\n", head.repeat);
2920
            unreg_resp.repeat = head.repeat;
2921
            registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
2922

    
2923
            for (count = 0; count < head.repeat; count++) {
2924
                reg = &registers[count];
2925
                network_to_register(reg);
2926

    
2927
                DDPRINTF("Unregistration request (%d): "
2928
                         " index %d, chunk %" PRIu64 "\n",
2929
                         count, reg->current_index, reg->key.chunk);
2930

    
2931
                block = &(rdma->local_ram_blocks.block[reg->current_index]);
2932

    
2933
                ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
2934
                block->pmr[reg->key.chunk] = NULL;
2935

    
2936
                if (ret != 0) {
2937
                    perror("rdma unregistration chunk failed");
2938
                    ret = -ret;
2939
                    goto out;
2940
                }
2941

    
2942
                rdma->total_registrations--;
2943

    
2944
                DDPRINTF("Unregistered chunk %" PRIu64 " successfully.\n",
2945
                            reg->key.chunk);
2946
            }
2947

    
2948
            ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
2949

    
2950
            if (ret < 0) {
2951
                fprintf(stderr, "Failed to send control buffer!\n");
2952
                goto out;
2953
            }
2954
            break;
2955
        case RDMA_CONTROL_REGISTER_RESULT:
2956
            fprintf(stderr, "Invalid RESULT message at dest.\n");
2957
            ret = -EIO;
2958
            goto out;
2959
        default:
2960
            fprintf(stderr, "Unknown control message %s\n",
2961
                                control_desc[head.type]);
2962
            ret = -EIO;
2963
            goto out;
2964
        }
2965
    } while (1);
2966
out:
2967
    if (ret < 0) {
2968
        rdma->error_state = ret;
2969
    }
2970
    return ret;
2971
}
2972

    
2973
static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
2974
                                        uint64_t flags)
2975
{
2976
    QEMUFileRDMA *rfile = opaque;
2977
    RDMAContext *rdma = rfile->rdma;
2978

    
2979
    CHECK_ERROR_STATE();
2980

    
2981
    DDDPRINTF("start section: %" PRIu64 "\n", flags);
2982
    qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
2983
    qemu_fflush(f);
2984

    
2985
    return 0;
2986
}
2987

    
2988
/*
2989
 * Inform dest that dynamic registrations are done for now.
2990
 * First, flush writes, if any.
2991
 */
2992
static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
2993
                                       uint64_t flags)
2994
{
2995
    Error *local_err = NULL, **errp = &local_err;
2996
    QEMUFileRDMA *rfile = opaque;
2997
    RDMAContext *rdma = rfile->rdma;
2998
    RDMAControlHeader head = { .len = 0, .repeat = 1 };
2999
    int ret = 0;
3000

    
3001
    CHECK_ERROR_STATE();
3002

    
3003
    qemu_fflush(f);
3004
    ret = qemu_rdma_drain_cq(f, rdma);
3005

    
3006
    if (ret < 0) {
3007
        goto err;
3008
    }
3009

    
3010
    if (flags == RAM_CONTROL_SETUP) {
3011
        RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3012
        RDMALocalBlocks *local = &rdma->local_ram_blocks;
3013
        int reg_result_idx, i, j, nb_remote_blocks;
3014

    
3015
        head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3016
        DPRINTF("Sending registration setup for ram blocks...\n");
3017

    
3018
        /*
3019
         * Make sure that we parallelize the pinning on both sides.
3020
         * For very large guests, doing this serially takes a really
3021
         * long time, so we have to 'interleave' the pinning locally
3022
         * with the control messages by performing the pinning on this
3023
         * side before we receive the control response from the other
3024
         * side that the pinning has completed.
3025
         */
3026
        ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3027
                    &reg_result_idx, rdma->pin_all ?
3028
                    qemu_rdma_reg_whole_ram_blocks : NULL);
3029
        if (ret < 0) {
3030
            ERROR(errp, "receiving remote info!\n");
3031
            return ret;
3032
        }
3033

    
3034
        qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3035
        memcpy(rdma->block,
3036
            rdma->wr_data[reg_result_idx].control_curr, resp.len);
3037

    
3038
        nb_remote_blocks = resp.len / sizeof(RDMARemoteBlock);
3039

    
3040
        /*
3041
         * The protocol uses two different sets of rkeys (mutually exclusive):
3042
         * 1. One key to represent the virtual address of the entire ram block.
3043
         *    (dynamic chunk registration disabled - pin everything with one rkey.)
3044
         * 2. One to represent individual chunks within a ram block.
3045
         *    (dynamic chunk registration enabled - pin individual chunks.)
3046
         *
3047
         * Once the capability is successfully negotiated, the destination transmits
3048
         * the keys to use (or sends them later) including the virtual addresses
3049
         * and then propagates the remote ram block descriptions to his local copy.
3050
         */
3051

    
3052
        if (local->nb_blocks != nb_remote_blocks) {
3053
            ERROR(errp, "ram blocks mismatch #1! "
3054
                        "Your QEMU command line parameters are probably "
3055
                        "not identical on both the source and destination.\n");
3056
            return -EINVAL;
3057
        }
3058

    
3059
        for (i = 0; i < nb_remote_blocks; i++) {
3060
            network_to_remote_block(&rdma->block[i]);
3061

    
3062
            /* search local ram blocks */
3063
            for (j = 0; j < local->nb_blocks; j++) {
3064
                if (rdma->block[i].offset != local->block[j].offset) {
3065
                    continue;
3066
                }
3067

    
3068
                if (rdma->block[i].length != local->block[j].length) {
3069
                    ERROR(errp, "ram blocks mismatch #2! "
3070
                        "Your QEMU command line parameters are probably "
3071
                        "not identical on both the source and destination.\n");
3072
                    return -EINVAL;
3073
                }
3074
                local->block[j].remote_host_addr =
3075
                        rdma->block[i].remote_host_addr;
3076
                local->block[j].remote_rkey = rdma->block[i].remote_rkey;
3077
                break;
3078
            }
3079

    
3080
            if (j >= local->nb_blocks) {
3081
                ERROR(errp, "ram blocks mismatch #3! "
3082
                        "Your QEMU command line parameters are probably "
3083
                        "not identical on both the source and destination.\n");
3084
                return -EINVAL;
3085
            }
3086
        }
3087
    }
3088

    
3089
    DDDPRINTF("Sending registration finish %" PRIu64 "...\n", flags);
3090

    
3091
    head.type = RDMA_CONTROL_REGISTER_FINISHED;
3092
    ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
3093

    
3094
    if (ret < 0) {
3095
        goto err;
3096
    }
3097

    
3098
    return 0;
3099
err:
3100
    rdma->error_state = ret;
3101
    return ret;
3102
}
3103

    
3104
static int qemu_rdma_get_fd(void *opaque)
3105
{
3106
    QEMUFileRDMA *rfile = opaque;
3107
    RDMAContext *rdma = rfile->rdma;
3108

    
3109
    return rdma->comp_channel->fd;
3110
}
3111

    
3112
const QEMUFileOps rdma_read_ops = {
3113
    .get_buffer    = qemu_rdma_get_buffer,
3114
    .get_fd        = qemu_rdma_get_fd,
3115
    .close         = qemu_rdma_close,
3116
    .hook_ram_load = qemu_rdma_registration_handle,
3117
};
3118

    
3119
const QEMUFileOps rdma_write_ops = {
3120
    .put_buffer         = qemu_rdma_put_buffer,
3121
    .close              = qemu_rdma_close,
3122
    .before_ram_iterate = qemu_rdma_registration_start,
3123
    .after_ram_iterate  = qemu_rdma_registration_stop,
3124
    .save_page          = qemu_rdma_save_page,
3125
};
3126

    
3127
static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
3128
{
3129
    QEMUFileRDMA *r = g_malloc0(sizeof(QEMUFileRDMA));
3130

    
3131
    if (qemu_file_mode_is_not_valid(mode)) {
3132
        return NULL;
3133
    }
3134

    
3135
    r->rdma = rdma;
3136

    
3137
    if (mode[0] == 'w') {
3138
        r->file = qemu_fopen_ops(r, &rdma_write_ops);
3139
    } else {
3140
        r->file = qemu_fopen_ops(r, &rdma_read_ops);
3141
    }
3142

    
3143
    return r->file;
3144
}
3145

    
3146
static void rdma_accept_incoming_migration(void *opaque)
3147
{
3148
    RDMAContext *rdma = opaque;
3149
    int ret;
3150
    QEMUFile *f;
3151
    Error *local_err = NULL, **errp = &local_err;
3152

    
3153
    DPRINTF("Accepting rdma connection...\n");
3154
    ret = qemu_rdma_accept(rdma);
3155

    
3156
    if (ret) {
3157
        ERROR(errp, "RDMA Migration initialization failed!\n");
3158
        return;
3159
    }
3160

    
3161
    DPRINTF("Accepted migration\n");
3162

    
3163
    f = qemu_fopen_rdma(rdma, "rb");
3164
    if (f == NULL) {
3165
        ERROR(errp, "could not qemu_fopen_rdma!\n");
3166
        qemu_rdma_cleanup(rdma);
3167
        return;
3168
    }
3169

    
3170
    rdma->migration_started_on_destination = 1;
3171
    process_incoming_migration(f);
3172
}
3173

    
3174
void rdma_start_incoming_migration(const char *host_port, Error **errp)
3175
{
3176
    int ret;
3177
    RDMAContext *rdma;
3178
    Error *local_err = NULL;
3179

    
3180
    DPRINTF("Starting RDMA-based incoming migration\n");
3181
    rdma = qemu_rdma_data_init(host_port, &local_err);
3182

    
3183
    if (rdma == NULL) {
3184
        goto err;
3185
    }
3186

    
3187
    ret = qemu_rdma_dest_init(rdma, &local_err);
3188

    
3189
    if (ret) {
3190
        goto err;
3191
    }
3192

    
3193
    DPRINTF("qemu_rdma_dest_init success\n");
3194

    
3195
    ret = rdma_listen(rdma->listen_id, 5);
3196

    
3197
    if (ret) {
3198
        ERROR(errp, "listening on socket!\n");
3199
        goto err;
3200
    }
3201

    
3202
    DPRINTF("rdma_listen success\n");
3203

    
3204
    qemu_set_fd_handler2(rdma->channel->fd, NULL,
3205
                         rdma_accept_incoming_migration, NULL,
3206
                            (void *)(intptr_t) rdma);
3207
    return;
3208
err:
3209
    error_propagate(errp, local_err);
3210
    g_free(rdma);
3211
}
3212

    
3213
void rdma_start_outgoing_migration(void *opaque,
3214
                            const char *host_port, Error **errp)
3215
{
3216
    MigrationState *s = opaque;
3217
    Error *local_err = NULL, **temp = &local_err;
3218
    RDMAContext *rdma = qemu_rdma_data_init(host_port, &local_err);
3219
    int ret = 0;
3220

    
3221
    if (rdma == NULL) {
3222
        ERROR(temp, "Failed to initialize RDMA data structures! %d\n", ret);
3223
        goto err;
3224
    }
3225

    
3226
    ret = qemu_rdma_source_init(rdma, &local_err,
3227
        s->enabled_capabilities[MIGRATION_CAPABILITY_X_RDMA_PIN_ALL]);
3228

    
3229
    if (ret) {
3230
        goto err;
3231
    }
3232

    
3233
    DPRINTF("qemu_rdma_source_init success\n");
3234
    ret = qemu_rdma_connect(rdma, &local_err);
3235

    
3236
    if (ret) {
3237
        goto err;
3238
    }
3239

    
3240
    DPRINTF("qemu_rdma_source_connect success\n");
3241

    
3242
    s->file = qemu_fopen_rdma(rdma, "wb");
3243
    migrate_fd_connect(s);
3244
    return;
3245
err:
3246
    error_propagate(errp, local_err);
3247
    g_free(rdma);
3248
    migrate_fd_error(s);
3249
}