root / xseg / peers / user / mt-vlmcd.c @ 6217821b
History | View | Annotate | Download (22.7 kB)
1 |
/*
|
---|---|
2 |
* Copyright 2012 GRNET S.A. All rights reserved.
|
3 |
*
|
4 |
* Redistribution and use in source and binary forms, with or
|
5 |
* without modification, are permitted provided that the following
|
6 |
* conditions are met:
|
7 |
*
|
8 |
* 1. Redistributions of source code must retain the above
|
9 |
* copyright notice, this list of conditions and the following
|
10 |
* disclaimer.
|
11 |
* 2. Redistributions in binary form must reproduce the above
|
12 |
* copyright notice, this list of conditions and the following
|
13 |
* disclaimer in the documentation and/or other materials
|
14 |
* provided with the distribution.
|
15 |
*
|
16 |
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
17 |
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
18 |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
19 |
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
20 |
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
21 |
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
22 |
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
23 |
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
24 |
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
25 |
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
26 |
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
27 |
* POSSIBILITY OF SUCH DAMAGE.
|
28 |
*
|
29 |
* The views and conclusions contained in the software and
|
30 |
* documentation are those of the authors and should not be
|
31 |
* interpreted as representing official policies, either expressed
|
32 |
* or implied, of GRNET S.A.
|
33 |
*/
|
34 |
|
35 |
#include <stdio.h> |
36 |
#include <stdlib.h> |
37 |
#include <unistd.h> |
38 |
#include <xseg/xseg.h> |
39 |
#include <xseg/protocol.h> |
40 |
#include <peer.h> |
41 |
#include <sched.h> |
42 |
#include <sys/syscall.h> |
43 |
|
44 |
enum io_state_enum {
|
45 |
ACCEPTED = 0,
|
46 |
MAPPING = 1,
|
47 |
SERVING = 2,
|
48 |
CONCLUDED = 3
|
49 |
}; |
50 |
|
51 |
#define VF_VOLUME_FREEZED (1 << 0) |
52 |
|
53 |
struct volume_info{
|
54 |
char name[XSEG_MAX_TARGETLEN + 1]; |
55 |
uint32_t flags; |
56 |
uint32_t active_reqs; |
57 |
struct xq *pending_reqs;
|
58 |
struct peer_req *pending_pr;
|
59 |
}; |
60 |
|
61 |
struct vlmcd {
|
62 |
xport mportno; |
63 |
xport bportno; |
64 |
xhash_t *volumes; //hash [volumename] -> struct volume_info
|
65 |
}; |
66 |
|
67 |
struct vlmc_io {
|
68 |
int err;
|
69 |
struct xlock lock;
|
70 |
volatile enum io_state_enum state; |
71 |
struct xseg_request *mreq;
|
72 |
struct xseg_request **breqs;
|
73 |
unsigned long breq_len, breq_cnt; |
74 |
}; |
75 |
|
76 |
void custom_peer_usage()
|
77 |
{ |
78 |
fprintf(stderr, "Custom peer options: \n"
|
79 |
"-mp : mapper port\n"
|
80 |
"-bp : blocker port for blocks\n"
|
81 |
"\n");
|
82 |
} |
83 |
|
84 |
static inline void __set_vio_state(struct vlmc_io *vio, enum io_state_enum state) |
85 |
{ |
86 |
vio->state = state; |
87 |
} |
88 |
|
89 |
static inline enum io_state_enum __get_vio_state(struct vlmc_io *vio) |
90 |
{ |
91 |
enum io_state_enum state;
|
92 |
state = vio->state; |
93 |
return state;
|
94 |
} |
95 |
|
96 |
static inline struct vlmc_io * __get_vlmcio(struct peer_req *pr) |
97 |
{ |
98 |
return (struct vlmc_io *) pr->priv; |
99 |
} |
100 |
|
101 |
static inline struct vlmcd * __get_vlmcd(struct peerd *peer) |
102 |
{ |
103 |
return (struct vlmcd *) peer->priv; |
104 |
} |
105 |
|
106 |
static struct xq * allocate_queue(xqindex nr) |
107 |
{ |
108 |
struct xq *q = malloc(sizeof(struct xq)); |
109 |
if (!q)
|
110 |
return NULL; |
111 |
if (!xq_alloc_empty(q, nr)){
|
112 |
free(q); |
113 |
return NULL; |
114 |
} |
115 |
return q;
|
116 |
} |
117 |
|
118 |
static int doubleup_queue(struct volume_info *vi) |
119 |
{ |
120 |
//assert vi->pending_reqs
|
121 |
XSEGLOG2(&lc, D, "Doubling up queue of volume %s", vi->name);
|
122 |
struct xq *newq = allocate_queue(vi->pending_reqs->size * 2); |
123 |
if (!newq){
|
124 |
XSEGLOG2(&lc, E, "Doubling up queue of volume %s failed. Allocation error",
|
125 |
vi->name); |
126 |
return -1; |
127 |
} |
128 |
|
129 |
if (__xq_resize(vi->pending_reqs, newq) == Noneidx){
|
130 |
xq_free(newq); |
131 |
free(newq); |
132 |
XSEGLOG2(&lc, E, "Doubling up queue of volume %s failed. Resize error",
|
133 |
vi->name); |
134 |
return -1; |
135 |
} |
136 |
xq_free(vi->pending_reqs); |
137 |
free(vi->pending_reqs); |
138 |
vi->pending_reqs = newq; |
139 |
XSEGLOG2(&lc, D, "Doubling up queue of volume %s completed", vi->name);
|
140 |
return 0; |
141 |
} |
142 |
|
143 |
static struct volume_info * find_volume(struct vlmcd *vlmc, char *volume) |
144 |
{ |
145 |
struct volume_info *vi = NULL; |
146 |
XSEGLOG2(&lc, D, "looking up volume %s", volume);
|
147 |
int r = xhash_lookup(vlmc->volumes, (xhashidx) volume,
|
148 |
(xhashidx *) &vi); |
149 |
if (r < 0){ |
150 |
XSEGLOG2(&lc, D, "looking up volume %s failed", volume);
|
151 |
return NULL; |
152 |
} |
153 |
XSEGLOG2(&lc, D, "looking up volume %s completed. VI: %lx",
|
154 |
volume, (unsigned long)vi); |
155 |
return vi;
|
156 |
} |
157 |
|
158 |
static struct volume_info * find_volume_len(struct vlmcd *vlmc, char *target, |
159 |
uint32_t targetlen) |
160 |
{ |
161 |
char buf[XSEG_MAX_TARGETLEN+1]; |
162 |
strncpy(buf, target, targetlen); |
163 |
buf[targetlen] = 0;
|
164 |
XSEGLOG2(&lc, D, "looking up volume %s, len %u",
|
165 |
buf, targetlen); |
166 |
return find_volume(vlmc, buf);
|
167 |
|
168 |
} |
169 |
|
170 |
static int insert_volume(struct vlmcd *vlmc, struct volume_info *vi) |
171 |
{ |
172 |
int r = -1; |
173 |
|
174 |
if (find_volume(vlmc, vi->name)){
|
175 |
XSEGLOG2(&lc, W, "Volume %s found in hash", vi->name);
|
176 |
return r;
|
177 |
} |
178 |
|
179 |
XSEGLOG2(&lc, D, "Inserting volume %s, len: %d (volume_info: %lx)",
|
180 |
vi->name, strlen(vi->name), (unsigned long) vi); |
181 |
r = xhash_insert(vlmc->volumes, (xhashidx) vi->name, (xhashidx) vi); |
182 |
while (r == -XHASH_ERESIZE) {
|
183 |
xhashidx shift = xhash_grow_size_shift(vlmc->volumes); |
184 |
xhash_t *new_hashmap = xhash_resize(vlmc->volumes, shift, NULL);
|
185 |
if (!new_hashmap){
|
186 |
XSEGLOG2(&lc, E, "Cannot grow vlmc->volumes to sizeshift %llu",
|
187 |
(unsigned long long) shift); |
188 |
return r;
|
189 |
} |
190 |
vlmc->volumes = new_hashmap; |
191 |
r = xhash_insert(vlmc->volumes, (xhashidx) vi->name, (xhashidx) vi); |
192 |
} |
193 |
XSEGLOG2(&lc, D, "Inserting volume %s, len: %d (volume_info: %lx) completed",
|
194 |
vi->name, strlen(vi->name), (unsigned long) vi); |
195 |
|
196 |
return r;
|
197 |
|
198 |
} |
199 |
|
200 |
static int remove_volume(struct vlmcd *vlmc, struct volume_info *vi) |
201 |
{ |
202 |
int r = -1; |
203 |
|
204 |
XSEGLOG2(&lc, D, "Removing volume %s, len: %d (volume_info: %lx)",
|
205 |
vi->name, strlen(vi->name), (unsigned long) vi); |
206 |
r = xhash_delete(vlmc->volumes, (xhashidx) vi->name); |
207 |
while (r == -XHASH_ERESIZE) {
|
208 |
xhashidx shift = xhash_shrink_size_shift(vlmc->volumes); |
209 |
xhash_t *new_hashmap = xhash_resize(vlmc->volumes, shift, NULL);
|
210 |
if (!new_hashmap){
|
211 |
XSEGLOG2(&lc, E, "Cannot shrink vlmc->volumes to sizeshift %llu",
|
212 |
(unsigned long long) shift); |
213 |
XSEGLOG2(&lc, E, "Removing volume %s, (volume_info: %lx) failed",
|
214 |
vi->name, (unsigned long) vi); |
215 |
return r;
|
216 |
} |
217 |
vlmc->volumes = new_hashmap; |
218 |
r = xhash_delete(vlmc->volumes, (xhashidx) vi->name); |
219 |
} |
220 |
if (r < 0) |
221 |
XSEGLOG2(&lc, W, "Removing volume %s, len: %d (volume_info: %lx) failed",
|
222 |
vi->name, strlen(vi->name), (unsigned long) vi); |
223 |
else
|
224 |
XSEGLOG2(&lc, D, "Removing volume %s, len: %d (volume_info: %lx) completed",
|
225 |
vi->name, strlen(vi->name), (unsigned long) vi); |
226 |
return r;
|
227 |
} |
228 |
|
229 |
static int do_accepted_pr(struct peerd *peer, struct peer_req *pr); |
230 |
|
231 |
static int conclude_pr(struct peerd *peer, struct peer_req *pr) |
232 |
{ |
233 |
struct vlmcd *vlmc = __get_vlmcd(peer);
|
234 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
235 |
char *target = xseg_get_target(peer->xseg, pr->req);
|
236 |
struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
|
237 |
|
238 |
XSEGLOG2(&lc, D, "Concluding pr %lx, req: %lx vi: %lx", pr, pr->req, vi);
|
239 |
|
240 |
__set_vio_state(vio, CONCLUDED); |
241 |
if (vio->err)
|
242 |
fail(peer, pr); |
243 |
else
|
244 |
complete(peer, pr); |
245 |
|
246 |
if (vi){
|
247 |
//assert vi->active_reqs > 0
|
248 |
uint32_t ar = --vi->active_reqs; |
249 |
XSEGLOG2(&lc, D, "vi: %lx, volume name: %s, active_reqs: %lu, pending_pr: %lx",
|
250 |
vi, vi->name, ar, vi->pending_pr); |
251 |
if (!ar && vi->pending_pr)
|
252 |
do_accepted_pr(peer, vi->pending_pr); |
253 |
} |
254 |
XSEGLOG2(&lc, D, "Concluded pr %lx, vi: %lx", pr, vi);
|
255 |
return 0; |
256 |
} |
257 |
|
258 |
static int should_freeze_volume(struct xseg_request *req) |
259 |
{ |
260 |
if (req->op == X_CLOSE || req->op == X_SNAPSHOT ||
|
261 |
(req->op == X_WRITE && !req->size && (req->flags & XF_FLUSH))) |
262 |
return 1; |
263 |
return 0; |
264 |
} |
265 |
|
266 |
static int do_accepted_pr(struct peerd *peer, struct peer_req *pr) |
267 |
{ |
268 |
struct vlmcd *vlmc = __get_vlmcd(peer);
|
269 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
270 |
int r;
|
271 |
xport p; |
272 |
char *target, *mtarget;
|
273 |
void *dummy;
|
274 |
|
275 |
struct volume_info *vi;
|
276 |
|
277 |
XSEGLOG2(&lc, I, "Do accepted pr started for pr %lx", pr);
|
278 |
target = xseg_get_target(peer->xseg, pr->req); |
279 |
if (!target){
|
280 |
vio->err = 1;
|
281 |
conclude_pr(peer, pr); |
282 |
return -1; |
283 |
} |
284 |
|
285 |
vi = find_volume_len(vlmc, target, pr->req->targetlen); |
286 |
if (!vi){
|
287 |
XSEGLOG2(&lc, E, "Cannot find volume");
|
288 |
XSEGLOG2(&lc, E, "Pr %lx", pr);
|
289 |
vio->err = 1;
|
290 |
conclude_pr(peer, pr); |
291 |
return -1; |
292 |
} |
293 |
|
294 |
if (should_freeze_volume(pr->req)){
|
295 |
XSEGLOG2(&lc, I, "Freezing volume %s", vi->name);
|
296 |
vi->flags |= VF_VOLUME_FREEZED; |
297 |
if (vi->active_reqs){
|
298 |
//assert vi->pending_pr == NULL;
|
299 |
XSEGLOG2(&lc, I, "Active reqs of %s: %lu. Pending pr is set to %lx",
|
300 |
vi->name, vi->active_reqs, pr); |
301 |
vi->pending_pr = pr; |
302 |
return 0; |
303 |
} |
304 |
else {
|
305 |
XSEGLOG2(&lc, I, "No active reqs of %s. Pending pr is set to NULL",
|
306 |
vi->name); |
307 |
//assert vi->pending_pr == pr
|
308 |
vi->pending_pr = NULL;
|
309 |
} |
310 |
|
311 |
} |
312 |
|
313 |
vi->active_reqs++; |
314 |
|
315 |
vio->err = 0; //reset error state |
316 |
|
317 |
if (pr->req->op == X_WRITE && !pr->req->size &&
|
318 |
(pr->req->flags & (XF_FLUSH|XF_FUA))){ |
319 |
//hanlde flush requests here, so we don't mess with mapper
|
320 |
//because of the -1 offset
|
321 |
vi->flags &= ~VF_VOLUME_FREEZED; |
322 |
XSEGLOG2(&lc, I, "Completing flush request");
|
323 |
pr->req->serviced = pr->req->size; |
324 |
conclude_pr(peer, pr); |
325 |
xqindex xqi; |
326 |
while (vi->pending_reqs && !(vi->flags & VF_VOLUME_FREEZED) &&
|
327 |
(xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) { |
328 |
struct peer_req *ppr = (struct peer_req *) xqi; |
329 |
do_accepted_pr(peer, ppr); |
330 |
} |
331 |
return 0; |
332 |
} |
333 |
|
334 |
vio->mreq = xseg_get_request(peer->xseg, pr->portno, |
335 |
vlmc->mportno, X_ALLOC); |
336 |
if (!vio->mreq)
|
337 |
goto out_err;
|
338 |
|
339 |
/* use datalen 0. let mapper allocate buffer space as needed */
|
340 |
r = xseg_prep_request(peer->xseg, vio->mreq, pr->req->targetlen, 0);
|
341 |
if (r < 0) { |
342 |
XSEGLOG2(&lc, E, "Cannot prep request %lx, of pr %lx for volume %s",
|
343 |
vio->mreq, pr, vi->name); |
344 |
goto out_put;
|
345 |
} |
346 |
mtarget = xseg_get_target(peer->xseg, vio->mreq); |
347 |
if (!mtarget)
|
348 |
goto out_put;
|
349 |
|
350 |
strncpy(mtarget, target, pr->req->targetlen); |
351 |
vio->mreq->size = pr->req->size; |
352 |
vio->mreq->offset = pr->req->offset; |
353 |
vio->mreq->flags = 0;
|
354 |
switch (pr->req->op) {
|
355 |
case X_READ: vio->mreq->op = X_MAPR; break; |
356 |
case X_WRITE: vio->mreq->op = X_MAPW; break; |
357 |
case X_INFO: vio->mreq->op = X_INFO; break; |
358 |
case X_CLOSE: vio->mreq->op = X_CLOSE; break; |
359 |
case X_OPEN: vio->mreq->op = X_OPEN; break; |
360 |
case X_SNAPSHOT: vio->mreq->op = X_SNAPSHOT; break; |
361 |
default: goto out_put; |
362 |
} |
363 |
xseg_set_req_data(peer->xseg, vio->mreq, pr); |
364 |
__set_vio_state(vio, MAPPING); |
365 |
p = xseg_submit(peer->xseg, vio->mreq, pr->portno, X_ALLOC); |
366 |
if (p == NoPort)
|
367 |
goto out_unset;
|
368 |
r = xseg_signal(peer->xseg, p); |
369 |
if (r < 0) { |
370 |
/* since submission is successful, just print a warning message */
|
371 |
XSEGLOG2(&lc, W, "Couldnt signal port %u", p);
|
372 |
} |
373 |
|
374 |
XSEGLOG2(&lc, I, "Pr %lx of volume %s completed", pr, vi->name);
|
375 |
return 0; |
376 |
|
377 |
out_unset:
|
378 |
xseg_get_req_data(peer->xseg, vio->mreq, &dummy); |
379 |
out_put:
|
380 |
xseg_put_request(peer->xseg, vio->mreq, pr->portno); |
381 |
out_err:
|
382 |
vio->err = 1;
|
383 |
XSEGLOG2(&lc, E, "Pr %lx of volume %s failed", pr, vi->name);
|
384 |
conclude_pr(peer, pr); |
385 |
return -1; |
386 |
} |
387 |
|
388 |
static int append_to_pending_reqs(struct volume_info *vi, struct peer_req *pr) |
389 |
{ |
390 |
XSEGLOG2(&lc, I, "Appending pr %lx to vi %lx, volume name %s",
|
391 |
pr, vi, vi->name); |
392 |
if (!vi->pending_reqs){
|
393 |
//allocate 8 as default. FIXME make it relevant to nr_ops;
|
394 |
vi->pending_reqs = allocate_queue(8);
|
395 |
} |
396 |
|
397 |
if (!vi->pending_reqs){
|
398 |
XSEGLOG2(&lc, E, "Cannot allocate pending reqs queue for volume %s",
|
399 |
vi->name); |
400 |
XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
|
401 |
pr, vi, vi->name); |
402 |
return -1; |
403 |
} |
404 |
|
405 |
xqindex r = __xq_append_tail(vi->pending_reqs, (xqindex) pr); |
406 |
if (r == Noneidx){
|
407 |
if (doubleup_queue(vi) < 0){ |
408 |
XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
|
409 |
pr, vi, vi->name); |
410 |
return -1; |
411 |
} |
412 |
r = __xq_append_tail(vi->pending_reqs, (xqindex) pr); |
413 |
} |
414 |
|
415 |
if (r == Noneidx){
|
416 |
XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
|
417 |
pr, vi, vi->name); |
418 |
return -1; |
419 |
} |
420 |
|
421 |
XSEGLOG2(&lc, I, "Appending pr %lx to vi %lx, volume name %s completed",
|
422 |
pr, vi, vi->name); |
423 |
return 0; |
424 |
} |
425 |
|
426 |
static int handle_accepted(struct peerd *peer, struct peer_req *pr, |
427 |
struct xseg_request *req)
|
428 |
{ |
429 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
430 |
struct vlmcd *vlmc = __get_vlmcd(peer);
|
431 |
char *target = xseg_get_target(peer->xseg, req);
|
432 |
struct volume_info *vi = find_volume_len(vlmc, target, req->targetlen);
|
433 |
XSEGLOG2(&lc, I, "Handle accepted for pr %lx, req %lx started", pr, req);
|
434 |
if (!vi){
|
435 |
vi = malloc(sizeof(struct volume_info)); |
436 |
if (!vi){
|
437 |
vio->err = 1;
|
438 |
conclude_pr(peer, pr); |
439 |
return -1; |
440 |
} |
441 |
strncpy(vi->name, target, req->targetlen); |
442 |
vi->name[req->targetlen] = 0;
|
443 |
vi->flags = 0;
|
444 |
vi->pending_pr = NULL;
|
445 |
vi->active_reqs = 0;
|
446 |
vi->pending_reqs = 0;
|
447 |
if (insert_volume(vlmc, vi) < 0){ |
448 |
vio->err = 1;
|
449 |
conclude_pr(peer, pr); |
450 |
free(vi); |
451 |
return -1; |
452 |
} |
453 |
} |
454 |
|
455 |
if (vi->flags & VF_VOLUME_FREEZED){
|
456 |
XSEGLOG2(&lc, I, "Volume %s (vi %lx) frozen. Appending to pending_reqs",
|
457 |
vi->name, vi); |
458 |
if (append_to_pending_reqs(vi, pr) < 0){ |
459 |
vio->err = 1;
|
460 |
conclude_pr(peer, pr); |
461 |
return -1; |
462 |
} |
463 |
return 0; |
464 |
} |
465 |
|
466 |
return do_accepted_pr(peer, pr);
|
467 |
} |
468 |
|
469 |
|
470 |
static int mapping_info(struct peerd *peer, struct peer_req *pr) |
471 |
{ |
472 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
473 |
if (vio->mreq->state & XS_FAILED){
|
474 |
XSEGLOG2(&lc, E, "Info req %lx failed",
|
475 |
(unsigned long)vio->mreq); |
476 |
vio->err = 1;
|
477 |
} |
478 |
else {
|
479 |
struct xseg_reply_info *xinfo = (struct xseg_reply_info *)xseg_get_data(peer->xseg, vio->mreq); |
480 |
char *data = xseg_get_data(peer->xseg, pr->req);
|
481 |
struct xseg_reply_info *xreply = (struct xseg_reply_info *)data; |
482 |
xreply->size = xinfo->size; |
483 |
} |
484 |
xseg_put_request(peer->xseg, vio->mreq, pr->portno); |
485 |
vio->mreq = NULL;
|
486 |
conclude_pr(peer, pr); |
487 |
return 0; |
488 |
} |
489 |
|
490 |
static int mapping_open(struct peerd *peer, struct peer_req *pr) |
491 |
{ |
492 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
493 |
if (vio->mreq->state & XS_FAILED){
|
494 |
XSEGLOG2(&lc, E, "Open req %lx failed",
|
495 |
(unsigned long)vio->mreq); |
496 |
vio->err = 1;
|
497 |
} |
498 |
xseg_put_request(peer->xseg, vio->mreq, pr->portno); |
499 |
vio->mreq = NULL;
|
500 |
conclude_pr(peer, pr); |
501 |
return 0; |
502 |
} |
503 |
|
504 |
static int mapping_close(struct peerd *peer, struct peer_req *pr) |
505 |
{ |
506 |
struct vlmcd *vlmc = __get_vlmcd(peer);
|
507 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
508 |
if (vio->mreq->state & XS_FAILED){
|
509 |
XSEGLOG2(&lc, E, "Close req %lx failed",
|
510 |
(unsigned long)vio->mreq); |
511 |
vio->err = 1;
|
512 |
} |
513 |
char *target = xseg_get_target(peer->xseg, pr->req);
|
514 |
struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
|
515 |
|
516 |
xseg_put_request(peer->xseg, vio->mreq, pr->portno); |
517 |
vio->mreq = NULL;
|
518 |
conclude_pr(peer, pr); |
519 |
|
520 |
//assert active_reqs == 1
|
521 |
//assert volume freezed
|
522 |
//unfreeze
|
523 |
if (!vi){
|
524 |
XSEGLOG2(&lc, E, "Volume has not volume info");
|
525 |
return 0; |
526 |
} |
527 |
vi->flags &= ~ VF_VOLUME_FREEZED; |
528 |
if (!vi->pending_reqs || !xq_count(vi->pending_reqs)){
|
529 |
XSEGLOG2(&lc, I, "Volume %s (vi %lx) had no pending reqs. Removing",
|
530 |
vi->name, vi); |
531 |
if (vi->pending_reqs)
|
532 |
xq_free(vi->pending_reqs); |
533 |
remove_volume(vlmc, vi); |
534 |
free(vi); |
535 |
} |
536 |
else {
|
537 |
xqindex xqi; |
538 |
XSEGLOG2(&lc, I, "Volume %s (vi %lx) had pending reqs. Handling",
|
539 |
vi->name, vi); |
540 |
while (!(vi->flags & VF_VOLUME_FREEZED) &&
|
541 |
(xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) { |
542 |
struct peer_req *ppr = (struct peer_req *) xqi; |
543 |
do_accepted_pr(peer, ppr); |
544 |
} |
545 |
XSEGLOG2(&lc, I, "Volume %s (vi %lx) handling pending reqs completed",
|
546 |
vi->name, vi); |
547 |
} |
548 |
return 0; |
549 |
} |
550 |
|
551 |
static int mapping_snapshot(struct peerd *peer, struct peer_req *pr) |
552 |
{ |
553 |
struct vlmcd *vlmc = __get_vlmcd(peer);
|
554 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
555 |
char *target = xseg_get_target(peer->xseg, pr->req);
|
556 |
struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
|
557 |
if (vio->mreq->state & XS_FAILED){
|
558 |
XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
|
559 |
(unsigned long)vio->mreq, vio->mreq->op); |
560 |
vio->err = 1;
|
561 |
} |
562 |
else {
|
563 |
struct xseg_reply_snapshot *xreply = (struct xseg_reply_snapshot *) xseg_get_data(peer->xseg, vio->mreq); |
564 |
char buf[XSEG_MAX_TARGETLEN];
|
565 |
strncpy(buf, target, pr->req->targetlen); |
566 |
int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, sizeof(struct xseg_reply_snapshot)); |
567 |
if (r < 0) { |
568 |
XSEGLOG2(&lc, E, "Cannot resize request");
|
569 |
vio->err = 1;
|
570 |
} |
571 |
else {
|
572 |
target = xseg_get_target(peer->xseg, pr->req); |
573 |
strncpy(target, buf, pr->req->targetlen); |
574 |
char *data = xseg_get_data(peer->xseg, pr->req);
|
575 |
struct xseg_reply_snapshot *xsnapshot = (struct xseg_reply_snapshot *) data; |
576 |
*xsnapshot = *xreply; |
577 |
} |
578 |
} |
579 |
|
580 |
xseg_put_request(peer->xseg, vio->mreq, pr->portno); |
581 |
vio->mreq = NULL;
|
582 |
conclude_pr(peer, pr); |
583 |
|
584 |
//assert volume freezed
|
585 |
//unfreeze
|
586 |
if (!vi){
|
587 |
XSEGLOG2(&lc, E, "Volume has no volume info");
|
588 |
return 0; |
589 |
} |
590 |
XSEGLOG2(&lc, D, "Unfreezing volume %s", vi->name);
|
591 |
vi->flags &= ~ VF_VOLUME_FREEZED; |
592 |
|
593 |
xqindex xqi; |
594 |
while (vi->pending_reqs && !(vi->flags & VF_VOLUME_FREEZED) &&
|
595 |
(xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) { |
596 |
struct peer_req *ppr = (struct peer_req *) xqi; |
597 |
do_accepted_pr(peer, ppr); |
598 |
} |
599 |
return 0; |
600 |
} |
601 |
|
602 |
static int mapping_readwrite(struct peerd *peer, struct peer_req *pr) |
603 |
{ |
604 |
struct vlmcd *vlmc = __get_vlmcd(peer);
|
605 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
606 |
struct xseg_reply_map *mreply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, vio->mreq); |
607 |
uint64_t pos, datalen, offset; |
608 |
uint32_t targetlen; |
609 |
struct xseg_request *breq;
|
610 |
char *target;
|
611 |
int i,r;
|
612 |
xport p; |
613 |
if (vio->mreq->state & XS_FAILED){
|
614 |
XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
|
615 |
(unsigned long)vio->mreq, vio->mreq->op); |
616 |
xseg_put_request(peer->xseg, vio->mreq, pr->portno); |
617 |
vio->mreq = NULL;
|
618 |
vio->err = 1;
|
619 |
conclude_pr(peer, pr); |
620 |
return 0; |
621 |
} |
622 |
|
623 |
if (!mreply || !mreply->cnt){
|
624 |
xseg_put_request(peer->xseg, vio->mreq, pr->portno); |
625 |
vio->mreq = NULL;
|
626 |
vio->err = 1;
|
627 |
conclude_pr(peer, pr); |
628 |
return -1; |
629 |
} |
630 |
|
631 |
vio->breq_len = mreply->cnt; |
632 |
vio->breqs = calloc(vio->breq_len, sizeof(struct xseg_request *)); |
633 |
if (!vio->breqs) {
|
634 |
xseg_put_request(peer->xseg, vio->mreq, pr->portno); |
635 |
vio->mreq = NULL;
|
636 |
vio->err = 1;
|
637 |
conclude_pr(peer, pr); |
638 |
return -1; |
639 |
} |
640 |
|
641 |
pos = 0;
|
642 |
__set_vio_state(vio, SERVING); |
643 |
for (i = 0; i < vio->breq_len; i++) { |
644 |
datalen = mreply->segs[i].size; |
645 |
offset = mreply->segs[i].offset; |
646 |
targetlen = mreply->segs[i].targetlen; |
647 |
breq = xseg_get_request(peer->xseg, pr->portno, vlmc->bportno, X_ALLOC); |
648 |
if (!breq) {
|
649 |
vio->err = 1;
|
650 |
break;
|
651 |
} |
652 |
r = xseg_prep_request(peer->xseg, breq, targetlen, datalen); |
653 |
if (r < 0) { |
654 |
vio->err = 1;
|
655 |
xseg_put_request(peer->xseg, breq, pr->portno); |
656 |
break;
|
657 |
} |
658 |
breq->offset = offset; |
659 |
breq->size = datalen; |
660 |
breq->op = pr->req->op; |
661 |
target = xseg_get_target(peer->xseg, breq); |
662 |
if (!target) {
|
663 |
vio->err = 1;
|
664 |
xseg_put_request(peer->xseg, breq, pr->portno); |
665 |
break;
|
666 |
} |
667 |
strncpy(target, mreply->segs[i].target, targetlen); |
668 |
r = xseg_set_req_data(peer->xseg, breq, pr); |
669 |
if (r<0) { |
670 |
vio->err = 1;
|
671 |
xseg_put_request(peer->xseg, breq, pr->portno); |
672 |
break;
|
673 |
} |
674 |
|
675 |
// this should work, right ?
|
676 |
breq->data = pr->req->data + pos; |
677 |
pos += datalen; |
678 |
p = xseg_submit(peer->xseg, breq, pr->portno, X_ALLOC); |
679 |
if (p == NoPort){
|
680 |
void *dummy;
|
681 |
vio->err = 1;
|
682 |
xseg_get_req_data(peer->xseg, breq, &dummy); |
683 |
xseg_put_request(peer->xseg, breq, pr->portno); |
684 |
break;
|
685 |
} |
686 |
r = xseg_signal(peer->xseg, p); |
687 |
if (r < 0){ |
688 |
XSEGLOG2(&lc, W, "Couldnt signal port %u", p);
|
689 |
} |
690 |
vio->breqs[i] = breq; |
691 |
} |
692 |
vio->breq_cnt = i; |
693 |
xseg_put_request(peer->xseg, vio->mreq, pr->portno); |
694 |
vio->mreq = NULL;
|
695 |
if (i == 0) { |
696 |
free(vio->breqs); |
697 |
vio->breqs = NULL;
|
698 |
vio->err = 1;
|
699 |
conclude_pr(peer, pr); |
700 |
return -1; |
701 |
} |
702 |
return 0; |
703 |
} |
704 |
|
705 |
static int handle_mapping(struct peerd *peer, struct peer_req *pr, |
706 |
struct xseg_request *req)
|
707 |
{ |
708 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
709 |
|
710 |
//assert vio>mreq == req
|
711 |
if (vio->mreq != req){
|
712 |
XSEGLOG2(&lc, E ,"vio->mreq %lx, req: %lx state: %d breq[0]: %lx",
|
713 |
(unsigned long)vio->mreq, (unsigned long)req, |
714 |
vio->state, (unsigned long)vio->breqs[0]); |
715 |
return -1; |
716 |
} |
717 |
|
718 |
switch (vio->mreq->op){
|
719 |
case X_INFO:
|
720 |
mapping_info(peer, pr); |
721 |
break;
|
722 |
case X_SNAPSHOT:
|
723 |
mapping_snapshot(peer, pr); |
724 |
break;
|
725 |
case X_CLOSE:
|
726 |
mapping_close(peer, pr); |
727 |
break;
|
728 |
case X_OPEN:
|
729 |
mapping_open(peer, pr); |
730 |
break;
|
731 |
case X_MAPR:
|
732 |
case X_MAPW:
|
733 |
mapping_readwrite(peer, pr); |
734 |
break;
|
735 |
default:
|
736 |
XSEGLOG2(&lc, W, "Invalid mreq op");
|
737 |
//vio->err = 1;
|
738 |
//conclude_pr(peer, pr);
|
739 |
break;
|
740 |
} |
741 |
|
742 |
return 0; |
743 |
} |
744 |
|
745 |
static int handle_serving(struct peerd *peer, struct peer_req *pr, |
746 |
struct xseg_request *req)
|
747 |
{ |
748 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
749 |
struct vlmcd *vlmc = __get_vlmcd(peer);
|
750 |
(void)vlmc;
|
751 |
struct xseg_request *breq = req;
|
752 |
|
753 |
if (breq->state & XS_FAILED && !(breq->state & XS_SERVED)) {
|
754 |
XSEGLOG2(&lc, E, "req %lx (op: %d) failed at offset %llu\n",
|
755 |
(unsigned long)req, req->op, |
756 |
(unsigned long long)req->offset); |
757 |
vio->err = 1;
|
758 |
} else {
|
759 |
//assert breq->serviced == breq->size
|
760 |
pr->req->serviced += breq->serviced; |
761 |
} |
762 |
xseg_put_request(peer->xseg, breq, pr->portno); |
763 |
|
764 |
if (!--vio->breq_cnt){
|
765 |
__set_vio_state(vio, CONCLUDED); |
766 |
free(vio->breqs); |
767 |
vio->breqs = NULL;
|
768 |
vio->breq_len = 0;
|
769 |
conclude_pr(peer, pr); |
770 |
} |
771 |
return 0; |
772 |
} |
773 |
|
774 |
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req, |
775 |
enum dispatch_reason reason)
|
776 |
{ |
777 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
778 |
struct vlmcd *vlmc = __get_vlmcd(peer);
|
779 |
(void)vlmc;
|
780 |
|
781 |
if (reason == dispatch_accept)
|
782 |
//assert (pr->req == req)
|
783 |
__set_vio_state(vio, ACCEPTED); |
784 |
|
785 |
enum io_state_enum state = __get_vio_state(vio);
|
786 |
switch (state) {
|
787 |
case ACCEPTED:
|
788 |
handle_accepted(peer, pr, req); |
789 |
break;
|
790 |
case MAPPING:
|
791 |
handle_mapping(peer, pr, req); |
792 |
break;
|
793 |
case SERVING:
|
794 |
handle_serving(peer, pr, req); |
795 |
break;
|
796 |
case CONCLUDED:
|
797 |
XSEGLOG2(&lc, W, "invalid state. dispatch called for CONCLUDED");
|
798 |
break;
|
799 |
default:
|
800 |
XSEGLOG2(&lc, E, "wtf dude? invalid state");
|
801 |
break;
|
802 |
} |
803 |
return 0; |
804 |
} |
805 |
|
806 |
|
807 |
int custom_peer_init(struct peerd *peer, int argc, char *argv[]) |
808 |
{ |
809 |
struct vlmc_io *vio;
|
810 |
struct vlmcd *vlmc = malloc(sizeof(struct vlmcd)); |
811 |
int i, j;
|
812 |
|
813 |
if (!vlmc) {
|
814 |
XSEGLOG2(&lc, E, "Cannot alloc vlmc");
|
815 |
return -1; |
816 |
} |
817 |
peer->priv = (void *) vlmc;
|
818 |
|
819 |
vlmc->volumes = xhash_new(3, STRING);
|
820 |
if (!vlmc->volumes){
|
821 |
XSEGLOG2(&lc, E, "Cannot alloc vlmc");
|
822 |
return -1; |
823 |
} |
824 |
vlmc->mportno = NoPort; |
825 |
vlmc->bportno = NoPort; |
826 |
|
827 |
BEGIN_READ_ARGS(argc, argv); |
828 |
READ_ARG_ULONG("-mp", vlmc->mportno);
|
829 |
READ_ARG_ULONG("-bp", vlmc->bportno);
|
830 |
END_READ_ARGS(); |
831 |
|
832 |
if (vlmc->bportno == NoPort) {
|
833 |
XSEGLOG2(&lc, E, "bportno must be provided");
|
834 |
usage(argv[0]);
|
835 |
return -1; |
836 |
} |
837 |
if (vlmc->mportno == NoPort) {
|
838 |
XSEGLOG2(&lc, E, "mportno must be provided");
|
839 |
usage(argv[0]);
|
840 |
return -1; |
841 |
} |
842 |
|
843 |
for (i = 0; i < peer->nr_ops; i++) { |
844 |
vio = malloc(sizeof(struct vlmc_io)); |
845 |
if (!vio) {
|
846 |
break;
|
847 |
} |
848 |
vio->mreq = NULL;
|
849 |
vio->breqs = NULL;
|
850 |
vio->breq_cnt = 0;
|
851 |
vio->breq_len = 0;
|
852 |
xlock_release(&vio->lock); |
853 |
peer->peer_reqs[i].priv = (void *) vio;
|
854 |
} |
855 |
if (i < peer->nr_ops) {
|
856 |
for (j = 0; j < i; j++) { |
857 |
free(peer->peer_reqs[i].priv); |
858 |
} |
859 |
return -1; |
860 |
} |
861 |
|
862 |
|
863 |
const struct sched_param param = { .sched_priority = 99 }; |
864 |
sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, ¶m); |
865 |
|
866 |
return 0; |
867 |
} |
868 |
|
869 |
void custom_peer_finalize(struct peerd *peer) |
870 |
{ |
871 |
return;
|
872 |
} |