root / xseg / peers / user / mt-vlmcd.c @ ae8c75f4
History | View | Annotate | Download (22.4 kB)
1 |
/*
|
---|---|
2 |
* Copyright 2012 GRNET S.A. All rights reserved.
|
3 |
*
|
4 |
* Redistribution and use in source and binary forms, with or
|
5 |
* without modification, are permitted provided that the following
|
6 |
* conditions are met:
|
7 |
*
|
8 |
* 1. Redistributions of source code must retain the above
|
9 |
* copyright notice, this list of conditions and the following
|
10 |
* disclaimer.
|
11 |
* 2. Redistributions in binary form must reproduce the above
|
12 |
* copyright notice, this list of conditions and the following
|
13 |
* disclaimer in the documentation and/or other materials
|
14 |
* provided with the distribution.
|
15 |
*
|
16 |
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
17 |
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
18 |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
19 |
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
20 |
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
21 |
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
22 |
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
23 |
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
24 |
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
25 |
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
26 |
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
27 |
* POSSIBILITY OF SUCH DAMAGE.
|
28 |
*
|
29 |
* The views and conclusions contained in the software and
|
30 |
* documentation are those of the authors and should not be
|
31 |
* interpreted as representing official policies, either expressed
|
32 |
* or implied, of GRNET S.A.
|
33 |
*/
|
34 |
|
35 |
#include <stdio.h> |
36 |
#include <stdlib.h> |
37 |
#include <unistd.h> |
38 |
#include <xseg/xseg.h> |
39 |
#include <xseg/protocol.h> |
40 |
#include <peer.h> |
41 |
#include <sched.h> |
42 |
#include <sys/syscall.h> |
43 |
|
44 |
enum io_state_enum {
|
45 |
ACCEPTED = 0,
|
46 |
MAPPING = 1,
|
47 |
SERVING = 2,
|
48 |
CONCLUDED = 3
|
49 |
}; |
50 |
|
51 |
#define VF_VOLUME_FREEZED (1 << 0) |
52 |
|
53 |
struct volume_info{
|
54 |
char name[XSEG_MAX_TARGETLEN + 1]; |
55 |
uint32_t flags; |
56 |
uint32_t active_reqs; |
57 |
struct xq *pending_reqs;
|
58 |
struct peer_req *pending_pr;
|
59 |
}; |
60 |
|
61 |
struct vlmcd {
|
62 |
xport mportno; |
63 |
xport bportno; |
64 |
xhash_t *volumes; //hash [volumename] -> struct volume_info
|
65 |
}; |
66 |
|
67 |
struct vlmc_io {
|
68 |
int err;
|
69 |
struct xlock lock;
|
70 |
volatile enum io_state_enum state; |
71 |
struct xseg_request *mreq;
|
72 |
struct xseg_request **breqs;
|
73 |
unsigned long breq_len, breq_cnt; |
74 |
}; |
75 |
|
76 |
void custom_peer_usage()
|
77 |
{ |
78 |
fprintf(stderr, "Custom peer options: \n"
|
79 |
"-mp : mapper port\n"
|
80 |
"-bp : blocker port for blocks\n"
|
81 |
"\n");
|
82 |
} |
83 |
|
84 |
static inline void __set_vio_state(struct vlmc_io *vio, enum io_state_enum state) |
85 |
{ |
86 |
vio->state = state; |
87 |
} |
88 |
|
89 |
static inline enum io_state_enum __get_vio_state(struct vlmc_io *vio) |
90 |
{ |
91 |
enum io_state_enum state;
|
92 |
state = vio->state; |
93 |
return state;
|
94 |
} |
95 |
|
96 |
static inline struct vlmc_io * __get_vlmcio(struct peer_req *pr) |
97 |
{ |
98 |
return (struct vlmc_io *) pr->priv; |
99 |
} |
100 |
|
101 |
static inline struct vlmcd * __get_vlmcd(struct peerd *peer) |
102 |
{ |
103 |
return (struct vlmcd *) peer->priv; |
104 |
} |
105 |
|
106 |
static struct xq * allocate_queue(xqindex nr) |
107 |
{ |
108 |
struct xq *q = malloc(sizeof(struct xq)); |
109 |
if (!q)
|
110 |
return NULL; |
111 |
if (!xq_alloc_empty(q, nr)){
|
112 |
free(q); |
113 |
return NULL; |
114 |
} |
115 |
return q;
|
116 |
} |
117 |
|
118 |
static int doubleup_queue(struct volume_info *vi) |
119 |
{ |
120 |
//assert vi->pending_reqs
|
121 |
XSEGLOG2(&lc, D, "Doubling up queue of volume %s", vi->name);
|
122 |
struct xq *newq = allocate_queue(vi->pending_reqs->size * 2); |
123 |
if (!newq){
|
124 |
XSEGLOG2(&lc, E, "Doubling up queue of volume %s failed. Allocation error",
|
125 |
vi->name); |
126 |
return -1; |
127 |
} |
128 |
|
129 |
if (__xq_resize(vi->pending_reqs, newq) == Noneidx){
|
130 |
xq_free(newq); |
131 |
free(newq); |
132 |
XSEGLOG2(&lc, E, "Doubling up queue of volume %s failed. Resize error",
|
133 |
vi->name); |
134 |
return -1; |
135 |
} |
136 |
xq_free(vi->pending_reqs); |
137 |
free(vi->pending_reqs); |
138 |
vi->pending_reqs = newq; |
139 |
XSEGLOG2(&lc, D, "Doubling up queue of volume %s completed", vi->name);
|
140 |
return 0; |
141 |
} |
142 |
|
143 |
static struct volume_info * find_volume(struct vlmcd *vlmc, char *volume) |
144 |
{ |
145 |
struct volume_info *vi = NULL; |
146 |
XSEGLOG2(&lc, D, "looking up volume %s", volume);
|
147 |
int r = xhash_lookup(vlmc->volumes, (xhashidx) volume,
|
148 |
(xhashidx *) &vi); |
149 |
if (r < 0){ |
150 |
XSEGLOG2(&lc, D, "looking up volume %s failed", volume);
|
151 |
return NULL; |
152 |
} |
153 |
XSEGLOG2(&lc, D, "looking up volume %s completed. VI: %lx",
|
154 |
volume, (unsigned long)vi); |
155 |
return vi;
|
156 |
} |
157 |
|
158 |
static struct volume_info * find_volume_len(struct vlmcd *vlmc, char *target, |
159 |
uint32_t targetlen) |
160 |
{ |
161 |
char buf[XSEG_MAX_TARGETLEN+1]; |
162 |
strncpy(buf, target, targetlen); |
163 |
buf[targetlen] = 0;
|
164 |
XSEGLOG2(&lc, D, "looking up volume %s, len %u",
|
165 |
buf, targetlen); |
166 |
return find_volume(vlmc, buf);
|
167 |
|
168 |
} |
169 |
|
170 |
static int insert_volume(struct vlmcd *vlmc, struct volume_info *vi) |
171 |
{ |
172 |
int r = -1; |
173 |
|
174 |
if (find_volume(vlmc, vi->name)){
|
175 |
XSEGLOG2(&lc, W, "Volume %s found in hash", vi->name);
|
176 |
return r;
|
177 |
} |
178 |
|
179 |
XSEGLOG2(&lc, D, "Inserting volume %s, len: %d (volume_info: %lx)",
|
180 |
vi->name, strlen(vi->name), (unsigned long) vi); |
181 |
r = xhash_insert(vlmc->volumes, (xhashidx) vi->name, (xhashidx) vi); |
182 |
while (r == -XHASH_ERESIZE) {
|
183 |
xhashidx shift = xhash_grow_size_shift(vlmc->volumes); |
184 |
xhash_t *new_hashmap = xhash_resize(vlmc->volumes, shift, 0, NULL); |
185 |
if (!new_hashmap){
|
186 |
XSEGLOG2(&lc, E, "Cannot grow vlmc->volumes to sizeshift %llu",
|
187 |
(unsigned long long) shift); |
188 |
return r;
|
189 |
} |
190 |
vlmc->volumes = new_hashmap; |
191 |
r = xhash_insert(vlmc->volumes, (xhashidx) vi->name, (xhashidx) vi); |
192 |
} |
193 |
XSEGLOG2(&lc, D, "Inserting volume %s, len: %d (volume_info: %lx) completed",
|
194 |
vi->name, strlen(vi->name), (unsigned long) vi); |
195 |
|
196 |
return r;
|
197 |
|
198 |
} |
199 |
|
200 |
static int remove_volume(struct vlmcd *vlmc, struct volume_info *vi) |
201 |
{ |
202 |
int r = -1; |
203 |
|
204 |
XSEGLOG2(&lc, D, "Removing volume %s, len: %d (volume_info: %lx)",
|
205 |
vi->name, strlen(vi->name), (unsigned long) vi); |
206 |
r = xhash_delete(vlmc->volumes, (xhashidx) vi->name); |
207 |
while (r == -XHASH_ERESIZE) {
|
208 |
xhashidx shift = xhash_shrink_size_shift(vlmc->volumes); |
209 |
xhash_t *new_hashmap = xhash_resize(vlmc->volumes, shift, 0, NULL); |
210 |
if (!new_hashmap){
|
211 |
XSEGLOG2(&lc, E, "Cannot shrink vlmc->volumes to sizeshift %llu",
|
212 |
(unsigned long long) shift); |
213 |
XSEGLOG2(&lc, E, "Removing volume %s, (volume_info: %lx) failed",
|
214 |
vi->name, (unsigned long) vi); |
215 |
return r;
|
216 |
} |
217 |
vlmc->volumes = new_hashmap; |
218 |
r = xhash_delete(vlmc->volumes, (xhashidx) vi->name); |
219 |
} |
220 |
if (r < 0) |
221 |
XSEGLOG2(&lc, W, "Removing volume %s, len: %d (volume_info: %lx) failed",
|
222 |
vi->name, strlen(vi->name), (unsigned long) vi); |
223 |
else
|
224 |
XSEGLOG2(&lc, D, "Removing volume %s, len: %d (volume_info: %lx) completed",
|
225 |
vi->name, strlen(vi->name), (unsigned long) vi); |
226 |
return r;
|
227 |
} |
228 |
|
229 |
static int do_accepted_pr(struct peerd *peer, struct peer_req *pr); |
230 |
|
231 |
static int conclude_pr(struct peerd *peer, struct peer_req *pr) |
232 |
{ |
233 |
struct vlmcd *vlmc = __get_vlmcd(peer);
|
234 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
235 |
char *target = xseg_get_target(peer->xseg, pr->req);
|
236 |
struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
|
237 |
|
238 |
XSEGLOG2(&lc, D, "Concluding pr %lx, req: %lx vi: %lx", pr, pr->req, vi);
|
239 |
|
240 |
__set_vio_state(vio, CONCLUDED); |
241 |
if (vio->err)
|
242 |
fail(peer, pr); |
243 |
else
|
244 |
complete(peer, pr); |
245 |
|
246 |
if (vi){
|
247 |
//assert vi->active_reqs > 0
|
248 |
uint32_t ar = --vi->active_reqs; |
249 |
XSEGLOG2(&lc, D, "vi: %lx, volume name: %s, active_reqs: %lu, pending_pr: %lx",
|
250 |
vi, vi->name, ar, vi->pending_pr); |
251 |
if (!ar && vi->pending_pr)
|
252 |
do_accepted_pr(peer, vi->pending_pr); |
253 |
} |
254 |
XSEGLOG2(&lc, D, "Concluded pr %lx, vi: %lx", pr, vi);
|
255 |
return 0; |
256 |
} |
257 |
|
258 |
static int should_freeze_volume(struct xseg_request *req) |
259 |
{ |
260 |
if (req->op == X_CLOSE || req->op == X_SNAPSHOT ||
|
261 |
(req->op == X_WRITE && !req->size && (req->flags & XF_FLUSH))) |
262 |
return 1; |
263 |
return 0; |
264 |
} |
265 |
|
266 |
static int do_accepted_pr(struct peerd *peer, struct peer_req *pr) |
267 |
{ |
268 |
struct vlmcd *vlmc = __get_vlmcd(peer);
|
269 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
270 |
int r;
|
271 |
xport p; |
272 |
char *target, *mtarget;
|
273 |
void *dummy;
|
274 |
|
275 |
struct volume_info *vi;
|
276 |
|
277 |
XSEGLOG2(&lc, I, "Do accepted pr started for pr %lx", pr);
|
278 |
target = xseg_get_target(peer->xseg, pr->req); |
279 |
if (!target){
|
280 |
vio->err = 1;
|
281 |
conclude_pr(peer, pr); |
282 |
return -1; |
283 |
} |
284 |
|
285 |
vi = find_volume_len(vlmc, target, pr->req->targetlen); |
286 |
if (!vi){
|
287 |
XSEGLOG2(&lc, E, "Cannot find volume");
|
288 |
XSEGLOG2(&lc, E, "Pr %lx", pr);
|
289 |
vio->err = 1;
|
290 |
conclude_pr(peer, pr); |
291 |
return -1; |
292 |
} |
293 |
|
294 |
if (should_freeze_volume(pr->req)){
|
295 |
XSEGLOG2(&lc, I, "Freezing volume %s", vi->name);
|
296 |
vi->flags |= VF_VOLUME_FREEZED; |
297 |
if (vi->active_reqs){
|
298 |
//assert vi->pending_pr == NULL;
|
299 |
XSEGLOG2(&lc, I, "Active reqs of %s: %lu. Pending pr is set to %lx",
|
300 |
vi->name, vi->active_reqs, pr); |
301 |
vi->pending_pr = pr; |
302 |
return 0; |
303 |
} |
304 |
else {
|
305 |
XSEGLOG2(&lc, I, "No active reqs of %s. Pending pr is set to NULL",
|
306 |
vi->name); |
307 |
//assert vi->pending_pr == pr
|
308 |
vi->pending_pr = NULL;
|
309 |
} |
310 |
|
311 |
} |
312 |
|
313 |
vi->active_reqs++; |
314 |
|
315 |
vio->err = 0; //reset error state |
316 |
|
317 |
if (pr->req->op == X_WRITE && !pr->req->size &&
|
318 |
(pr->req->flags & (XF_FLUSH|XF_FUA))){ |
319 |
//hanlde flush requests here, so we don't mess with mapper
|
320 |
//because of the -1 offset
|
321 |
vi->flags &= ~VF_VOLUME_FREEZED; |
322 |
XSEGLOG2(&lc, I, "Completing flush request");
|
323 |
pr->req->serviced = pr->req->size; |
324 |
conclude_pr(peer, pr); |
325 |
return 0; |
326 |
} |
327 |
|
328 |
vio->mreq = xseg_get_request(peer->xseg, pr->portno, |
329 |
vlmc->mportno, X_ALLOC); |
330 |
if (!vio->mreq)
|
331 |
goto out_err;
|
332 |
|
333 |
/* use datalen 0. let mapper allocate buffer space as needed */
|
334 |
r = xseg_prep_request(peer->xseg, vio->mreq, pr->req->targetlen, 0);
|
335 |
if (r < 0) { |
336 |
XSEGLOG2(&lc, E, "Cannot prep request %lx, of pr %lx for volume %s",
|
337 |
vio->mreq, pr, vi->name); |
338 |
goto out_put;
|
339 |
} |
340 |
mtarget = xseg_get_target(peer->xseg, vio->mreq); |
341 |
if (!mtarget)
|
342 |
goto out_put;
|
343 |
|
344 |
strncpy(mtarget, target, pr->req->targetlen); |
345 |
vio->mreq->size = pr->req->size; |
346 |
vio->mreq->offset = pr->req->offset; |
347 |
vio->mreq->flags = 0;
|
348 |
switch (pr->req->op) {
|
349 |
case X_READ: vio->mreq->op = X_MAPR; break; |
350 |
case X_WRITE: vio->mreq->op = X_MAPW; break; |
351 |
case X_INFO: vio->mreq->op = X_INFO; break; |
352 |
case X_CLOSE: vio->mreq->op = X_CLOSE; break; |
353 |
case X_OPEN: vio->mreq->op = X_OPEN; break; |
354 |
case X_SNAPSHOT:
|
355 |
//FIXME hack
|
356 |
vio->mreq->op = X_SNAPSHOT; |
357 |
vio->mreq->data = pr->req->data; |
358 |
break;
|
359 |
default: goto out_put; |
360 |
} |
361 |
xseg_set_req_data(peer->xseg, vio->mreq, pr); |
362 |
__set_vio_state(vio, MAPPING); |
363 |
p = xseg_submit(peer->xseg, vio->mreq, pr->portno, X_ALLOC); |
364 |
if (p == NoPort)
|
365 |
goto out_unset;
|
366 |
r = xseg_signal(peer->xseg, p); |
367 |
if (r < 0) { |
368 |
/* since submission is successful, just print a warning message */
|
369 |
XSEGLOG2(&lc, W, "Couldnt signal port %u", p);
|
370 |
} |
371 |
|
372 |
XSEGLOG2(&lc, I, "Pr %lx of volume %s completed", pr, vi->name);
|
373 |
return 0; |
374 |
|
375 |
out_unset:
|
376 |
xseg_get_req_data(peer->xseg, vio->mreq, &dummy); |
377 |
out_put:
|
378 |
xseg_put_request(peer->xseg, vio->mreq, pr->portno); |
379 |
out_err:
|
380 |
vio->err = 1;
|
381 |
XSEGLOG2(&lc, E, "Pr %lx of volume %s failed", pr, vi->name);
|
382 |
conclude_pr(peer, pr); |
383 |
return -1; |
384 |
} |
385 |
|
386 |
static int append_to_pending_reqs(struct volume_info *vi, struct peer_req *pr) |
387 |
{ |
388 |
XSEGLOG2(&lc, I, "Appending pr %lx to vi %lx, volume name %s",
|
389 |
pr, vi, vi->name); |
390 |
if (!vi->pending_reqs){
|
391 |
//allocate 8 as default. FIXME make it relevant to nr_ops;
|
392 |
vi->pending_reqs = allocate_queue(8);
|
393 |
} |
394 |
|
395 |
if (!vi->pending_reqs){
|
396 |
XSEGLOG2(&lc, E, "Cannot allocate pending reqs queue for volume %s",
|
397 |
vi->name); |
398 |
XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
|
399 |
pr, vi, vi->name); |
400 |
return -1; |
401 |
} |
402 |
|
403 |
xqindex r = __xq_append_tail(vi->pending_reqs, (xqindex) pr); |
404 |
if (r == Noneidx){
|
405 |
if (doubleup_queue(vi) < 0){ |
406 |
XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
|
407 |
pr, vi, vi->name); |
408 |
return -1; |
409 |
} |
410 |
r = __xq_append_tail(vi->pending_reqs, (xqindex) pr); |
411 |
} |
412 |
|
413 |
if (r == Noneidx){
|
414 |
XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
|
415 |
pr, vi, vi->name); |
416 |
return -1; |
417 |
} |
418 |
|
419 |
XSEGLOG2(&lc, I, "Appending pr %lx to vi %lx, volume name %s completed",
|
420 |
pr, vi, vi->name); |
421 |
return 0; |
422 |
} |
423 |
|
424 |
static int handle_accepted(struct peerd *peer, struct peer_req *pr, |
425 |
struct xseg_request *req)
|
426 |
{ |
427 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
428 |
struct vlmcd *vlmc = __get_vlmcd(peer);
|
429 |
char *target = xseg_get_target(peer->xseg, req);
|
430 |
struct volume_info *vi = find_volume_len(vlmc, target, req->targetlen);
|
431 |
XSEGLOG2(&lc, I, "Handle accepted for pr %lx, req %lx started", pr, req);
|
432 |
if (!vi){
|
433 |
vi = malloc(sizeof(struct volume_info)); |
434 |
if (!vi){
|
435 |
vio->err = 1;
|
436 |
conclude_pr(peer, pr); |
437 |
return -1; |
438 |
} |
439 |
strncpy(vi->name, target, req->targetlen); |
440 |
vi->name[req->targetlen] = 0;
|
441 |
vi->flags = 0;
|
442 |
vi->pending_pr = NULL;
|
443 |
vi->active_reqs = 0;
|
444 |
vi->pending_reqs = 0;
|
445 |
if (insert_volume(vlmc, vi) < 0){ |
446 |
vio->err = 1;
|
447 |
conclude_pr(peer, pr); |
448 |
free(vi); |
449 |
return -1; |
450 |
} |
451 |
} |
452 |
|
453 |
if (vi->flags & VF_VOLUME_FREEZED){
|
454 |
XSEGLOG2(&lc, I, "Volume %s (vi %lx) frozen. Appending to pending_reqs",
|
455 |
vi->name, vi); |
456 |
if (append_to_pending_reqs(vi, pr) < 0){ |
457 |
vio->err = 1;
|
458 |
conclude_pr(peer, pr); |
459 |
return -1; |
460 |
} |
461 |
return 0; |
462 |
} |
463 |
|
464 |
return do_accepted_pr(peer, pr);
|
465 |
} |
466 |
|
467 |
|
468 |
static int mapping_info(struct peerd *peer, struct peer_req *pr) |
469 |
{ |
470 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
471 |
struct xseg_request *req = pr->req;
|
472 |
char *target;
|
473 |
char buf[XSEG_MAX_TARGETLEN + 1]; |
474 |
int r;
|
475 |
|
476 |
if (vio->mreq->state & XS_FAILED){
|
477 |
XSEGLOG2(&lc, E, "Info req %lx failed",
|
478 |
(unsigned long)vio->mreq); |
479 |
vio->err = 1;
|
480 |
} |
481 |
else {
|
482 |
if (req->datalen < sizeof(struct xseg_reply_info)) { |
483 |
target = xseg_get_target(peer->xseg, req); |
484 |
strncpy(buf, target, req->targetlen); |
485 |
r = xseg_resize_request(peer->xseg, req, req->targetlen, sizeof(struct xseg_reply_info)); |
486 |
if (r < 0) { |
487 |
XSEGLOG2(&lc, E, "Cannot resize request");
|
488 |
vio->err = 1;
|
489 |
goto out;
|
490 |
} |
491 |
target = xseg_get_target(peer->xseg, req); |
492 |
strncpy(target, buf, req->targetlen); |
493 |
} |
494 |
struct xseg_reply_info *xinfo = (struct xseg_reply_info *)xseg_get_data(peer->xseg, vio->mreq); |
495 |
char *data = xseg_get_data(peer->xseg, pr->req);
|
496 |
struct xseg_reply_info *xreply = (struct xseg_reply_info *)data; |
497 |
xreply->size = xinfo->size; |
498 |
} |
499 |
out:
|
500 |
xseg_put_request(peer->xseg, vio->mreq, pr->portno); |
501 |
vio->mreq = NULL;
|
502 |
conclude_pr(peer, pr); |
503 |
return 0; |
504 |
} |
505 |
|
506 |
static int mapping_open(struct peerd *peer, struct peer_req *pr) |
507 |
{ |
508 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
509 |
if (vio->mreq->state & XS_FAILED){
|
510 |
XSEGLOG2(&lc, E, "Open req %lx failed",
|
511 |
(unsigned long)vio->mreq); |
512 |
vio->err = 1;
|
513 |
} |
514 |
xseg_put_request(peer->xseg, vio->mreq, pr->portno); |
515 |
vio->mreq = NULL;
|
516 |
conclude_pr(peer, pr); |
517 |
return 0; |
518 |
} |
519 |
|
520 |
static int mapping_close(struct peerd *peer, struct peer_req *pr) |
521 |
{ |
522 |
struct vlmcd *vlmc = __get_vlmcd(peer);
|
523 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
524 |
if (vio->mreq->state & XS_FAILED){
|
525 |
XSEGLOG2(&lc, E, "Close req %lx failed",
|
526 |
(unsigned long)vio->mreq); |
527 |
vio->err = 1;
|
528 |
} |
529 |
char *target = xseg_get_target(peer->xseg, pr->req);
|
530 |
struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
|
531 |
|
532 |
xseg_put_request(peer->xseg, vio->mreq, pr->portno); |
533 |
vio->mreq = NULL;
|
534 |
conclude_pr(peer, pr); |
535 |
|
536 |
//assert active_reqs == 1
|
537 |
//assert volume freezed
|
538 |
//unfreeze
|
539 |
if (!vi){
|
540 |
XSEGLOG2(&lc, E, "Volume has not volume info");
|
541 |
return 0; |
542 |
} |
543 |
vi->flags &= ~ VF_VOLUME_FREEZED; |
544 |
if (!vi->pending_reqs || !xq_count(vi->pending_reqs)){
|
545 |
XSEGLOG2(&lc, I, "Volume %s (vi %lx) had no pending reqs. Removing",
|
546 |
vi->name, vi); |
547 |
if (vi->pending_reqs)
|
548 |
xq_free(vi->pending_reqs); |
549 |
remove_volume(vlmc, vi); |
550 |
free(vi); |
551 |
} |
552 |
else {
|
553 |
xqindex xqi; |
554 |
XSEGLOG2(&lc, I, "Volume %s (vi %lx) had pending reqs. Handling",
|
555 |
vi->name, vi); |
556 |
while (!(vi->flags & VF_VOLUME_FREEZED) &&
|
557 |
(xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) { |
558 |
struct peer_req *ppr = (struct peer_req *) xqi; |
559 |
do_accepted_pr(peer, ppr); |
560 |
} |
561 |
XSEGLOG2(&lc, I, "Volume %s (vi %lx) handling pending reqs completed",
|
562 |
vi->name, vi); |
563 |
} |
564 |
return 0; |
565 |
} |
566 |
|
567 |
static int mapping_snapshot(struct peerd *peer, struct peer_req *pr) |
568 |
{ |
569 |
struct vlmcd *vlmc = __get_vlmcd(peer);
|
570 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
571 |
char *target = xseg_get_target(peer->xseg, pr->req);
|
572 |
struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
|
573 |
if (vio->mreq->state & XS_FAILED){
|
574 |
XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
|
575 |
(unsigned long)vio->mreq, vio->mreq->op); |
576 |
vio->err = 1;
|
577 |
} |
578 |
|
579 |
xseg_put_request(peer->xseg, vio->mreq, pr->portno); |
580 |
vio->mreq = NULL;
|
581 |
conclude_pr(peer, pr); |
582 |
|
583 |
//assert volume freezed
|
584 |
//unfreeze
|
585 |
if (!vi){
|
586 |
XSEGLOG2(&lc, E, "Volume has no volume info");
|
587 |
return 0; |
588 |
} |
589 |
XSEGLOG2(&lc, D, "Unfreezing volume %s", vi->name);
|
590 |
vi->flags &= ~ VF_VOLUME_FREEZED; |
591 |
|
592 |
xqindex xqi; |
593 |
while (vi->pending_reqs && !(vi->flags & VF_VOLUME_FREEZED) &&
|
594 |
(xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) { |
595 |
struct peer_req *ppr = (struct peer_req *) xqi; |
596 |
do_accepted_pr(peer, ppr); |
597 |
} |
598 |
return 0; |
599 |
} |
600 |
|
601 |
static int mapping_readwrite(struct peerd *peer, struct peer_req *pr) |
602 |
{ |
603 |
struct vlmcd *vlmc = __get_vlmcd(peer);
|
604 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
605 |
struct xseg_reply_map *mreply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, vio->mreq); |
606 |
uint64_t pos, datalen, offset; |
607 |
uint32_t targetlen; |
608 |
struct xseg_request *breq;
|
609 |
char *target;
|
610 |
int i,r;
|
611 |
xport p; |
612 |
if (vio->mreq->state & XS_FAILED){
|
613 |
XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
|
614 |
(unsigned long)vio->mreq, vio->mreq->op); |
615 |
xseg_put_request(peer->xseg, vio->mreq, pr->portno); |
616 |
vio->mreq = NULL;
|
617 |
vio->err = 1;
|
618 |
conclude_pr(peer, pr); |
619 |
return 0; |
620 |
} |
621 |
|
622 |
if (!mreply || !mreply->cnt){
|
623 |
xseg_put_request(peer->xseg, vio->mreq, pr->portno); |
624 |
vio->mreq = NULL;
|
625 |
vio->err = 1;
|
626 |
conclude_pr(peer, pr); |
627 |
return -1; |
628 |
} |
629 |
|
630 |
vio->breq_len = mreply->cnt; |
631 |
vio->breqs = calloc(vio->breq_len, sizeof(struct xseg_request *)); |
632 |
if (!vio->breqs) {
|
633 |
xseg_put_request(peer->xseg, vio->mreq, pr->portno); |
634 |
vio->mreq = NULL;
|
635 |
vio->err = 1;
|
636 |
conclude_pr(peer, pr); |
637 |
return -1; |
638 |
} |
639 |
|
640 |
pos = 0;
|
641 |
__set_vio_state(vio, SERVING); |
642 |
for (i = 0; i < vio->breq_len; i++) { |
643 |
datalen = mreply->segs[i].size; |
644 |
offset = mreply->segs[i].offset; |
645 |
targetlen = mreply->segs[i].targetlen; |
646 |
breq = xseg_get_request(peer->xseg, pr->portno, vlmc->bportno, X_ALLOC); |
647 |
if (!breq) {
|
648 |
vio->err = 1;
|
649 |
break;
|
650 |
} |
651 |
r = xseg_prep_request(peer->xseg, breq, targetlen, datalen); |
652 |
if (r < 0) { |
653 |
vio->err = 1;
|
654 |
xseg_put_request(peer->xseg, breq, pr->portno); |
655 |
break;
|
656 |
} |
657 |
breq->offset = offset; |
658 |
breq->size = datalen; |
659 |
breq->op = pr->req->op; |
660 |
target = xseg_get_target(peer->xseg, breq); |
661 |
if (!target) {
|
662 |
vio->err = 1;
|
663 |
xseg_put_request(peer->xseg, breq, pr->portno); |
664 |
break;
|
665 |
} |
666 |
strncpy(target, mreply->segs[i].target, targetlen); |
667 |
r = xseg_set_req_data(peer->xseg, breq, pr); |
668 |
if (r<0) { |
669 |
vio->err = 1;
|
670 |
xseg_put_request(peer->xseg, breq, pr->portno); |
671 |
break;
|
672 |
} |
673 |
|
674 |
// this should work, right ?
|
675 |
breq->data = pr->req->data + pos; |
676 |
pos += datalen; |
677 |
p = xseg_submit(peer->xseg, breq, pr->portno, X_ALLOC); |
678 |
if (p == NoPort){
|
679 |
void *dummy;
|
680 |
vio->err = 1;
|
681 |
xseg_get_req_data(peer->xseg, breq, &dummy); |
682 |
xseg_put_request(peer->xseg, breq, pr->portno); |
683 |
break;
|
684 |
} |
685 |
r = xseg_signal(peer->xseg, p); |
686 |
if (r < 0){ |
687 |
XSEGLOG2(&lc, W, "Couldnt signal port %u", p);
|
688 |
} |
689 |
vio->breqs[i] = breq; |
690 |
} |
691 |
vio->breq_cnt = i; |
692 |
xseg_put_request(peer->xseg, vio->mreq, pr->portno); |
693 |
vio->mreq = NULL;
|
694 |
if (i == 0) { |
695 |
free(vio->breqs); |
696 |
vio->breqs = NULL;
|
697 |
vio->err = 1;
|
698 |
conclude_pr(peer, pr); |
699 |
return -1; |
700 |
} |
701 |
return 0; |
702 |
} |
703 |
|
704 |
static int handle_mapping(struct peerd *peer, struct peer_req *pr, |
705 |
struct xseg_request *req)
|
706 |
{ |
707 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
708 |
|
709 |
//assert vio>mreq == req
|
710 |
if (vio->mreq != req){
|
711 |
XSEGLOG2(&lc, E ,"vio->mreq %lx, req: %lx state: %d breq[0]: %lx",
|
712 |
(unsigned long)vio->mreq, (unsigned long)req, |
713 |
vio->state, (unsigned long)vio->breqs[0]); |
714 |
return -1; |
715 |
} |
716 |
|
717 |
switch (vio->mreq->op){
|
718 |
case X_INFO:
|
719 |
mapping_info(peer, pr); |
720 |
break;
|
721 |
case X_SNAPSHOT:
|
722 |
mapping_snapshot(peer, pr); |
723 |
break;
|
724 |
case X_CLOSE:
|
725 |
mapping_close(peer, pr); |
726 |
break;
|
727 |
case X_OPEN:
|
728 |
mapping_open(peer, pr); |
729 |
break;
|
730 |
case X_MAPR:
|
731 |
case X_MAPW:
|
732 |
mapping_readwrite(peer, pr); |
733 |
break;
|
734 |
default:
|
735 |
XSEGLOG2(&lc, W, "Invalid mreq op");
|
736 |
//vio->err = 1;
|
737 |
//conclude_pr(peer, pr);
|
738 |
break;
|
739 |
} |
740 |
|
741 |
return 0; |
742 |
} |
743 |
|
744 |
static int handle_serving(struct peerd *peer, struct peer_req *pr, |
745 |
struct xseg_request *req)
|
746 |
{ |
747 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
748 |
struct vlmcd *vlmc = __get_vlmcd(peer);
|
749 |
(void)vlmc;
|
750 |
struct xseg_request *breq = req;
|
751 |
|
752 |
if (breq->state & XS_FAILED && !(breq->state & XS_SERVED)) {
|
753 |
XSEGLOG2(&lc, E, "req %lx (op: %d) failed at offset %llu\n",
|
754 |
(unsigned long)req, req->op, |
755 |
(unsigned long long)req->offset); |
756 |
vio->err = 1;
|
757 |
} else {
|
758 |
//assert breq->serviced == breq->size
|
759 |
pr->req->serviced += breq->serviced; |
760 |
} |
761 |
xseg_put_request(peer->xseg, breq, pr->portno); |
762 |
|
763 |
if (!--vio->breq_cnt){
|
764 |
__set_vio_state(vio, CONCLUDED); |
765 |
free(vio->breqs); |
766 |
vio->breqs = NULL;
|
767 |
vio->breq_len = 0;
|
768 |
conclude_pr(peer, pr); |
769 |
} |
770 |
return 0; |
771 |
} |
772 |
|
773 |
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req, |
774 |
enum dispatch_reason reason)
|
775 |
{ |
776 |
struct vlmc_io *vio = __get_vlmcio(pr);
|
777 |
struct vlmcd *vlmc = __get_vlmcd(peer);
|
778 |
(void)vlmc;
|
779 |
|
780 |
if (reason == dispatch_accept)
|
781 |
//assert (pr->req == req)
|
782 |
__set_vio_state(vio, ACCEPTED); |
783 |
|
784 |
enum io_state_enum state = __get_vio_state(vio);
|
785 |
switch (state) {
|
786 |
case ACCEPTED:
|
787 |
handle_accepted(peer, pr, req); |
788 |
break;
|
789 |
case MAPPING:
|
790 |
handle_mapping(peer, pr, req); |
791 |
break;
|
792 |
case SERVING:
|
793 |
handle_serving(peer, pr, req); |
794 |
break;
|
795 |
case CONCLUDED:
|
796 |
XSEGLOG2(&lc, W, "invalid state. dispatch called for CONCLUDED");
|
797 |
break;
|
798 |
default:
|
799 |
XSEGLOG2(&lc, E, "wtf dude? invalid state");
|
800 |
break;
|
801 |
} |
802 |
return 0; |
803 |
} |
804 |
|
805 |
|
806 |
int custom_peer_init(struct peerd *peer, int argc, char *argv[]) |
807 |
{ |
808 |
struct vlmc_io *vio;
|
809 |
struct vlmcd *vlmc = malloc(sizeof(struct vlmcd)); |
810 |
int i, j;
|
811 |
|
812 |
if (!vlmc) {
|
813 |
XSEGLOG2(&lc, E, "Cannot alloc vlmc");
|
814 |
return -1; |
815 |
} |
816 |
peer->priv = (void *) vlmc;
|
817 |
|
818 |
vlmc->volumes = xhash_new(3, 0, STRING); |
819 |
if (!vlmc->volumes){
|
820 |
XSEGLOG2(&lc, E, "Cannot alloc vlmc");
|
821 |
return -1; |
822 |
} |
823 |
vlmc->mportno = NoPort; |
824 |
vlmc->bportno = NoPort; |
825 |
|
826 |
BEGIN_READ_ARGS(argc, argv); |
827 |
READ_ARG_ULONG("-mp", vlmc->mportno);
|
828 |
READ_ARG_ULONG("-bp", vlmc->bportno);
|
829 |
END_READ_ARGS(); |
830 |
|
831 |
if (vlmc->bportno == NoPort) {
|
832 |
XSEGLOG2(&lc, E, "bportno must be provided");
|
833 |
usage(argv[0]);
|
834 |
return -1; |
835 |
} |
836 |
if (vlmc->mportno == NoPort) {
|
837 |
XSEGLOG2(&lc, E, "mportno must be provided");
|
838 |
usage(argv[0]);
|
839 |
return -1; |
840 |
} |
841 |
|
842 |
for (i = 0; i < peer->nr_ops; i++) { |
843 |
vio = malloc(sizeof(struct vlmc_io)); |
844 |
if (!vio) {
|
845 |
break;
|
846 |
} |
847 |
vio->mreq = NULL;
|
848 |
vio->breqs = NULL;
|
849 |
vio->breq_cnt = 0;
|
850 |
vio->breq_len = 0;
|
851 |
xlock_release(&vio->lock); |
852 |
peer->peer_reqs[i].priv = (void *) vio;
|
853 |
} |
854 |
if (i < peer->nr_ops) {
|
855 |
for (j = 0; j < i; j++) { |
856 |
free(peer->peer_reqs[i].priv); |
857 |
} |
858 |
return -1; |
859 |
} |
860 |
|
861 |
|
862 |
const struct sched_param param = { .sched_priority = 99 }; |
863 |
sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, ¶m); |
864 |
|
865 |
return 0; |
866 |
} |
867 |
|
868 |
void custom_peer_finalize(struct peerd *peer) |
869 |
{ |
870 |
return;
|
871 |
} |