root / xseg / peers / user / glusterd.c @ feature-glusterd
History | View | Annotate | Download (33.2 kB)
1 |
/*
|
---|---|
2 |
* Copyright 2012 GRNET S.A. All rights reserved.
|
3 |
*
|
4 |
* Redistribution and use in source and binary forms, with or
|
5 |
* without modification, are permitted provided that the following
|
6 |
* conditions are met:
|
7 |
*
|
8 |
* 1. Redistributions of source code must retain the above
|
9 |
* copyright notice, this list of conditions and the following
|
10 |
* disclaimer.
|
11 |
* 2. Redistributions in binary form must reproduce the above
|
12 |
* copyright notice, this list of conditions and the following
|
13 |
* disclaimer in the documentation and/or other materials
|
14 |
* provided with the distribution.
|
15 |
*
|
16 |
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
17 |
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
18 |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
19 |
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
20 |
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
21 |
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
22 |
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
23 |
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
24 |
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
25 |
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
26 |
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
27 |
* POSSIBILITY OF SUCH DAMAGE.
|
28 |
*
|
29 |
* The views and conclusions contained in the software and
|
30 |
* documentation are those of the authors and should not be
|
31 |
* interpreted as representing official policies, either expressed
|
32 |
* or implied, of GRNET S.A.
|
33 |
*/
|
34 |
|
35 |
#include <stdio.h> |
36 |
#include <stdlib.h> |
37 |
#include <unistd.h> |
38 |
#include <xseg/xseg.h> |
39 |
#include <peer.h> |
40 |
#include <xseg/protocol.h> |
41 |
#include <glusterfs/api/glfs.h> |
42 |
#include <pthread.h> |
43 |
#include <ctype.h> |
44 |
#include <errno.h> |
45 |
#include <hash.h> |
46 |
#include <sys/types.h> |
47 |
#include <unistd.h> |
48 |
|
49 |
|
50 |
#define MAX_UNIQUESTR_LEN 128 |
51 |
#define LOCK_SUFFIX "_lock" |
52 |
#define LOCK_SUFFIX_LEN 5 |
53 |
#define HASH_SUFFIX "_hash" |
54 |
#define HASH_SUFFIX_LEN 5 |
55 |
|
56 |
#define MAX_OBJ_NAME (XSEG_MAX_TARGETLEN + LOCK_SUFFIX_LEN + 1) |
57 |
#define MAX_GLFS_ARG_LEN 64 |
58 |
|
59 |
void custom_peer_usage()
|
60 |
{ |
61 |
fprintf(stderr, "Custom peer options:\n"
|
62 |
" Option | Default | \n"
|
63 |
" --------------------------------------------\n"
|
64 |
" --transport | tcp | Transport type [tcp|rdma|unix]\n"
|
65 |
" --server | 127.0.0.1 | A server that is part of the\n"
|
66 |
" | | volume\n"
|
67 |
" --port | 0(=24007) | Port of server's gluster daemon\n"
|
68 |
" --volume | None | Gluster volume to connect to\n"
|
69 |
" --uniquestr | pid | Unique string for this instance\n"
|
70 |
" --async | 0 | If 1, glusted will use the async\n"
|
71 |
" | | equivalent functions\n"
|
72 |
"\n"
|
73 |
); |
74 |
} |
75 |
|
76 |
#define REQ_UNDEFINED -2 |
77 |
#define REQ_FAILED -1 |
78 |
#define REQ_SUBMITTED 0 |
79 |
#define REQ_COMPLETED 1 |
80 |
|
81 |
enum gluster_state {
|
82 |
ACCEPTED = 0,
|
83 |
PENDING = 1,
|
84 |
READING = 2,
|
85 |
WRITING = 3,
|
86 |
STATING = 4,
|
87 |
PREHASHING = 5,
|
88 |
POSTHASHING = 6
|
89 |
}; |
90 |
|
91 |
struct glusterd {
|
92 |
glfs_t *glfs; |
93 |
char uniquestr[MAX_UNIQUESTR_LEN + 1]; |
94 |
int uniquestr_len;
|
95 |
int async;
|
96 |
}; |
97 |
|
98 |
struct gluster_io{
|
99 |
char obj_name[MAX_OBJ_NAME + 1]; |
100 |
glfs_fd_t *fd; |
101 |
enum gluster_state state;
|
102 |
uint64_t size; |
103 |
char *second_name, *buf;
|
104 |
uint64_t read; |
105 |
}; |
106 |
|
107 |
static inline struct glusterd *__get_gluster(struct peerd *peer) |
108 |
{ |
109 |
return peer->priv;
|
110 |
} |
111 |
|
112 |
static inline int __set_glfs(struct glusterd *gluster, char *volume) |
113 |
{ |
114 |
gluster->glfs = glfs_new(volume); |
115 |
|
116 |
if (!gluster->glfs)
|
117 |
return -1; |
118 |
|
119 |
return 0; |
120 |
} |
121 |
|
122 |
static inline glfs_t *__get_glfs(struct glusterd *gluster) |
123 |
{ |
124 |
return gluster->glfs;
|
125 |
} |
126 |
|
127 |
int handle_read(struct peerd *peer, struct peer_req *pr); |
128 |
int handle_write(struct peerd *peer, struct peer_req *pr); |
129 |
|
130 |
|
131 |
static void gluster_complete_read(struct glfs_fd *fd, ssize_t ret, void *data) |
132 |
{ |
133 |
struct peer_req *pr = (struct peer_req*)data; |
134 |
struct peerd *peer = pr->peer;
|
135 |
|
136 |
pr->retval = ret; |
137 |
dispatch(peer, pr, pr->req, dispatch_internal); |
138 |
} |
139 |
|
140 |
static void gluster_complete_write(struct glfs_fd *fd, ssize_t ret, void *data) |
141 |
{ |
142 |
struct peer_req *pr = (struct peer_req*)data; |
143 |
struct peerd *peer = pr->peer;
|
144 |
|
145 |
pr->retval = ret; |
146 |
dispatch(peer, pr, pr->req, dispatch_internal); |
147 |
} |
148 |
|
149 |
static void submit_hook(struct peer_req *pr) |
150 |
{ |
151 |
struct peerd *peer = pr->peer;
|
152 |
struct glusterd *gluster = __get_gluster(peer);
|
153 |
|
154 |
if (!gluster->async)
|
155 |
dispatch(peer, pr, pr->req, dispatch_internal); |
156 |
} |
157 |
|
158 |
static void create_hash_name(struct peer_req *pr, char *hash_name) |
159 |
{ |
160 |
struct gluster_io *gio = (struct gluster_io *) pr->priv; |
161 |
int pos = 0; |
162 |
|
163 |
strncpy(hash_name, gio->obj_name, strlen(gio->obj_name)); |
164 |
pos += strlen(gio->obj_name); |
165 |
strncpy(hash_name + pos, HASH_SUFFIX, HASH_SUFFIX_LEN); |
166 |
pos += HASH_SUFFIX_LEN; |
167 |
hash_name[pos] = 0;
|
168 |
} |
169 |
|
170 |
static int allocate_gio_secname(struct peer_req *pr) |
171 |
{ |
172 |
struct gluster_io *gio = (struct gluster_io *) pr->priv; |
173 |
|
174 |
gio->second_name = malloc(MAX_OBJ_NAME + 1);
|
175 |
if (!gio->second_name)
|
176 |
return -1; |
177 |
|
178 |
return 0; |
179 |
} |
180 |
|
181 |
static int allocate_gio_buf(struct peer_req *pr) |
182 |
{ |
183 |
struct gluster_io *gio = (struct gluster_io *) pr->priv; |
184 |
struct xseg_request *req = pr->req;
|
185 |
|
186 |
gio->buf = malloc(req->size); |
187 |
if (!gio->buf)
|
188 |
return -1; |
189 |
|
190 |
return 0; |
191 |
} |
192 |
|
193 |
static int prepare_copy(struct peer_req *pr) |
194 |
{ |
195 |
struct peerd *peer = pr->peer;
|
196 |
struct gluster_io *gio = (struct gluster_io *) pr->priv; |
197 |
char *data = xseg_get_data(peer->xseg, pr->req);
|
198 |
struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data; |
199 |
unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? |
200 |
MAX_OBJ_NAME : xcopy->targetlen; |
201 |
int r;
|
202 |
|
203 |
r = allocate_gio_secname(pr); |
204 |
if (r < 0) |
205 |
return r;
|
206 |
|
207 |
/* FIXME: terminate or fail if targetlen > MAX_OBJ_NAME ? */
|
208 |
strncpy(gio->second_name, xcopy->target, end); |
209 |
gio->second_name[end] = 0;
|
210 |
gio->read = 0;
|
211 |
|
212 |
r = allocate_gio_buf(pr); |
213 |
return r;
|
214 |
} |
215 |
|
216 |
static int prepare_hash(struct peer_req *pr, char *hash_name) |
217 |
{ |
218 |
int r;
|
219 |
|
220 |
r = allocate_gio_secname(pr); |
221 |
if (r < 0) |
222 |
return r;
|
223 |
|
224 |
create_hash_name(pr, hash_name); |
225 |
|
226 |
r = allocate_gio_buf(pr); |
227 |
return r;
|
228 |
} |
229 |
|
230 |
static int write_lock_owner(struct peer_req *pr, glfs_fd_t *fd) |
231 |
{ |
232 |
struct peerd *peer = pr->peer;
|
233 |
struct glusterd *gluster = __get_gluster(peer);
|
234 |
char *me = gluster->uniquestr;
|
235 |
int len = strlen(me);
|
236 |
int serviced = 0; |
237 |
int r = 0; |
238 |
|
239 |
while (len > serviced) {
|
240 |
r = glfs_pwrite(fd, me + serviced, len - serviced, |
241 |
0 + serviced, 0); |
242 |
if (r > 0) |
243 |
serviced += r; |
244 |
else if (errno != EINTR) |
245 |
return -1; |
246 |
} |
247 |
|
248 |
return 0; |
249 |
} |
250 |
|
251 |
static int read_lock_owner(struct peer_req *pr, glfs_fd_t *fd, char *owner) |
252 |
{ |
253 |
struct peerd *peer = pr->peer;
|
254 |
struct glusterd *gluster = __get_gluster(peer);
|
255 |
char *buf = gluster->uniquestr;
|
256 |
int len = strlen(buf);
|
257 |
int serviced = 0; |
258 |
int r = 0; |
259 |
|
260 |
while (len > serviced) {
|
261 |
r = glfs_pread(fd, owner + serviced, len - serviced, |
262 |
0 + serviced, 0); |
263 |
if (r > 0) |
264 |
serviced += r; |
265 |
else if (r == 0) |
266 |
break;
|
267 |
else if (errno != EINTR) |
268 |
return -1; |
269 |
} |
270 |
|
271 |
return serviced;
|
272 |
} |
273 |
|
274 |
static int validate_lock_owner(struct peer_req *pr, glfs_fd_t *fd) |
275 |
{ |
276 |
struct peerd *peer = pr->peer;
|
277 |
struct glusterd *gluster = __get_gluster(peer);
|
278 |
char lock_owner[MAX_UNIQUESTR_LEN + 1]; |
279 |
char *me = gluster->uniquestr;
|
280 |
int len = strlen(me);
|
281 |
int r = 0; |
282 |
|
283 |
r = read_lock_owner(pr, fd, lock_owner); |
284 |
if (r < 0) { |
285 |
XSEGLOG2(&lc, E, "Failed to read lock owner of fd %p", fd);
|
286 |
return -1; |
287 |
} |
288 |
|
289 |
if (r != len || strncmp(me, lock_owner, r) != 0) { |
290 |
XSEGLOG2(&lc, D, "Lock owner is %s while we are %s",
|
291 |
lock_owner, me); |
292 |
return -1; |
293 |
} |
294 |
|
295 |
return 0; |
296 |
} |
297 |
|
298 |
static int do_aio_generic(struct peer_req *pr, uint32_t op, |
299 |
char *buf, uint64_t size, uint64_t offset)
|
300 |
{ |
301 |
struct peerd *peer = pr->peer;
|
302 |
struct glusterd *gluster = __get_gluster(peer);
|
303 |
struct gluster_io *gio = (struct gluster_io *) pr->priv; |
304 |
glfs_fd_t *fd = gio->fd; |
305 |
int r;
|
306 |
|
307 |
switch (op) {
|
308 |
case X_READ:
|
309 |
if (gluster->async) {
|
310 |
r = glfs_pread_async(fd, buf, size, offset, 0,
|
311 |
gluster_complete_read, pr); |
312 |
} else {
|
313 |
r = glfs_pread(fd, buf, size, offset, 0);
|
314 |
if (r >= 0) |
315 |
pr->retval = r; |
316 |
} |
317 |
break;
|
318 |
case X_WRITE:
|
319 |
if (gluster->async) {
|
320 |
r = glfs_pwrite_async(fd, buf, size, offset, 0,
|
321 |
gluster_complete_write, pr); |
322 |
} else {
|
323 |
r = glfs_pwrite(fd, buf, size, offset, 0);
|
324 |
if (r >= 0) |
325 |
pr->retval = r; |
326 |
} |
327 |
break;
|
328 |
default:
|
329 |
return -1; |
330 |
break;
|
331 |
} |
332 |
return r;
|
333 |
} |
334 |
|
335 |
static int begin_aio_read(struct peer_req *pr, char *buf, |
336 |
uint64_t size, uint64_t offset) |
337 |
{ |
338 |
int r = 0; |
339 |
|
340 |
r = do_aio_generic(pr, X_READ, buf, size, offset); |
341 |
if (r >= 0) |
342 |
return REQ_SUBMITTED;
|
343 |
else
|
344 |
return REQ_FAILED;
|
345 |
} |
346 |
|
347 |
static int begin_aio_write(struct peer_req *pr, char *buf, |
348 |
uint64_t size, uint64_t offset) |
349 |
{ |
350 |
int r = 0; |
351 |
|
352 |
r = do_aio_generic(pr, X_WRITE, buf, size, offset); |
353 |
if (r >= 0) |
354 |
return REQ_SUBMITTED;
|
355 |
else
|
356 |
return REQ_FAILED;
|
357 |
} |
358 |
|
359 |
static int complete_aio_read(struct peer_req *pr, char *buf, |
360 |
uint64_t size, uint64_t offset, uint64_t *serviced) |
361 |
{ |
362 |
int r = 0; |
363 |
|
364 |
/* Leave on fail or if there are no other data */
|
365 |
if (pr->retval < 0) { |
366 |
/* TODO: check errors */
|
367 |
return REQ_FAILED;
|
368 |
} else if (pr->retval == 0) { |
369 |
XSEGLOG2(&lc, I, "Zeroing rest of data");
|
370 |
memset(buf + *serviced, 0, size - *serviced);
|
371 |
*serviced = size; |
372 |
return REQ_COMPLETED;
|
373 |
} |
374 |
|
375 |
/*
|
376 |
* Else, check if all data have been served and resubmit if necessary
|
377 |
*/
|
378 |
*serviced += pr->retval; |
379 |
if (*serviced == size) {
|
380 |
return REQ_COMPLETED;
|
381 |
} else {
|
382 |
r = do_aio_generic(pr, X_READ, buf + *serviced, |
383 |
size - *serviced, offset + *serviced); |
384 |
if (r >= 0) |
385 |
return REQ_SUBMITTED;
|
386 |
else
|
387 |
return REQ_FAILED;
|
388 |
} |
389 |
} |
390 |
|
391 |
static int complete_aio_write(struct peer_req *pr, char *buf, |
392 |
uint64_t size, uint64_t offset, uint64_t *serviced) |
393 |
{ |
394 |
int r = 0; |
395 |
|
396 |
/* Leave on fail or if there are no other data */
|
397 |
if (pr->retval < 0){ |
398 |
/* TODO: check errors */
|
399 |
return REQ_FAILED;
|
400 |
} |
401 |
|
402 |
/*
|
403 |
* Else, check if all data have been served and resubmit if necessary
|
404 |
*/
|
405 |
*serviced += pr->retval; |
406 |
if (*serviced == size) {
|
407 |
return REQ_COMPLETED;
|
408 |
} else {
|
409 |
r = do_aio_generic(pr, X_WRITE, buf + *serviced, |
410 |
size - *serviced, offset + *serviced); |
411 |
if (r >= 0) |
412 |
return REQ_SUBMITTED;
|
413 |
else
|
414 |
return REQ_FAILED;
|
415 |
} |
416 |
|
417 |
return r;
|
418 |
} |
419 |
|
420 |
static glfs_fd_t *do_block_create(struct peer_req *pr, char *target, int mode) |
421 |
{ |
422 |
struct peerd *peer = pr->peer;
|
423 |
struct glusterd *gluster = __get_gluster(peer);
|
424 |
glfs_t *glfs = __get_glfs(gluster); |
425 |
glfs_fd_t *fd = NULL;
|
426 |
|
427 |
/*
|
428 |
* Create the requested file (in O_EXCL mode if requested)
|
429 |
* If errno is EINTR, retry. If it is other that EEXIST or EINTR,
|
430 |
* leave.
|
431 |
*/
|
432 |
XSEGLOG2(&lc, D, "Creating target %s", target);
|
433 |
do {
|
434 |
errno = 0;
|
435 |
fd = glfs_creat(glfs, target, |
436 |
O_RDWR | O_CREAT | O_TRUNC | mode, |
437 |
S_IRUSR | S_IWUSR); |
438 |
|
439 |
if (fd)
|
440 |
return fd;
|
441 |
if (errno != EEXIST && errno != EINTR) {
|
442 |
XSEGLOG2(&lc, E, "Unexpected error (errno %d) while "
|
443 |
"creating %s", errno, target);
|
444 |
return fd;
|
445 |
} |
446 |
} while (errno == EINTR);
|
447 |
|
448 |
return fd;
|
449 |
} |
450 |
|
451 |
static glfs_fd_t *do_block_open(struct peer_req *pr, char *target, int mode) |
452 |
{ |
453 |
struct peerd *peer = pr->peer;
|
454 |
struct glusterd *gluster = __get_gluster(peer);
|
455 |
glfs_t *glfs = __get_glfs(gluster); |
456 |
glfs_fd_t *fd; |
457 |
|
458 |
/*
|
459 |
* Open the requested file.
|
460 |
* If errno is EINTR, retry. If it is other that ENOENT or EINTR,
|
461 |
* leave.
|
462 |
*/
|
463 |
XSEGLOG2(&lc, D, "Opening target %s", target);
|
464 |
do {
|
465 |
errno = 0;
|
466 |
fd = glfs_open(glfs, target, O_RDWR); |
467 |
|
468 |
if (fd)
|
469 |
return fd;
|
470 |
if (errno != ENOENT && errno != EINTR) {
|
471 |
XSEGLOG2(&lc, E, "Unexpected error (errno %d) while "
|
472 |
"opening %s:", errno, target);
|
473 |
return fd;
|
474 |
} |
475 |
} while (errno == EINTR);
|
476 |
|
477 |
if (!fd && !(mode & O_CREAT))
|
478 |
return NULL; |
479 |
|
480 |
/*
|
481 |
* Create the requested file only if user has demanded so.
|
482 |
* If errno is EINTR, retry. Else, leave.
|
483 |
*/
|
484 |
fd = do_block_create(pr, target, 0);
|
485 |
return fd;
|
486 |
} |
487 |
|
488 |
static int do_block_close(struct peer_req *pr) |
489 |
{ |
490 |
struct gluster_io *gio = (struct gluster_io *)(pr->priv); |
491 |
glfs_fd_t *fd = gio->fd; |
492 |
int r;
|
493 |
|
494 |
XSEGLOG2(&lc, D, "Closing fd %p", fd);
|
495 |
if (!fd)
|
496 |
return 0; |
497 |
|
498 |
r = glfs_close(fd); |
499 |
if (r < 0) |
500 |
XSEGLOG2(&lc, E, "Unexpected error (errno %d) while "
|
501 |
"closing fd %p:", errno, fd);
|
502 |
|
503 |
return r;
|
504 |
} |
505 |
|
506 |
/*
|
507 |
* do_block_lock() does the following:
|
508 |
* a. Checks if there is a lock-file for this object.
|
509 |
* i. Checks if the lock-file has been placed by us.
|
510 |
* ii. If we own the lock-file or if we are in XF_NOSYNC mode, it returns.
|
511 |
* iii. Else, it proceeds to (b)
|
512 |
* b. Attempts to create its own lockfile (obj_name + lock suffix). If it
|
513 |
* succeeds or is in XF_NOSYNC mode, it returns.
|
514 |
* c. If it has not locked the target object and is NOT in XF_NOSYNC mode, it
|
515 |
* sleeps for 1 second and it reattempts.
|
516 |
*/
|
517 |
int do_block_lock(struct peer_req *pr, char *target, int mode) |
518 |
{ |
519 |
glfs_fd_t *fd = NULL;
|
520 |
int r;
|
521 |
|
522 |
XSEGLOG2(&lc, D, "Locking target %s", target);
|
523 |
do {
|
524 |
fd = do_block_open(pr, target, 0);
|
525 |
if (fd) {
|
526 |
r = validate_lock_owner(pr, fd); |
527 |
if (r > 0 || mode == XF_NOSYNC) |
528 |
break;
|
529 |
} |
530 |
|
531 |
fd = do_block_create(pr, target, O_EXCL); |
532 |
if (fd) {
|
533 |
r = write_lock_owner(pr, fd); |
534 |
break;
|
535 |
} else if (mode == XF_NOSYNC) { |
536 |
r = -1;
|
537 |
break;
|
538 |
} |
539 |
|
540 |
sleep(1);
|
541 |
} while (1); |
542 |
|
543 |
glfs_close(fd); |
544 |
|
545 |
return r;
|
546 |
} |
547 |
|
548 |
/*
|
549 |
* do_block_unlock() does the following:
|
550 |
* a. Checks if there is a lock-file for this object. If not, it can safely
|
551 |
* return.
|
552 |
* b. Checks if the lock-file has been placed by us.
|
553 |
* ii. If we own the lock-file or if we are in XF_FORCE mode, we unlink it.
|
554 |
* c. If the unlinking was successful or if it failed due to ENOENT, the
|
555 |
* unlock operation is considered successful.
|
556 |
*/
|
557 |
int do_block_unlock(struct peer_req *pr, char *target, int mode) |
558 |
{ |
559 |
struct peerd *peer = pr->peer;
|
560 |
struct glusterd *gluster = __get_gluster(peer);
|
561 |
glfs_t *glfs = __get_glfs(gluster); |
562 |
glfs_fd_t *fd = NULL;
|
563 |
int r = 0; |
564 |
|
565 |
XSEGLOG2(&lc, D, "Unlocking target %s", target);
|
566 |
fd = do_block_open(pr, target, 0);
|
567 |
if (!fd) {
|
568 |
XSEGLOG2(&lc, D, "Target %s was not locked", target);
|
569 |
return -1; |
570 |
} |
571 |
|
572 |
r = validate_lock_owner(pr, fd); |
573 |
if (r < 0 && !(mode & XF_FORCE)) |
574 |
return -1; |
575 |
|
576 |
r = glfs_unlink(glfs, target); |
577 |
if (r >= 0 || errno == ENOENT) |
578 |
return 0; |
579 |
else
|
580 |
return -1; |
581 |
} |
582 |
|
583 |
int do_block_stat(struct peer_req *pr, char *target, struct stat *buf) |
584 |
{ |
585 |
struct peerd *peer = pr->peer;
|
586 |
struct glusterd *gluster = __get_gluster(peer);
|
587 |
glfs_t *glfs = __get_glfs(gluster); |
588 |
|
589 |
XSEGLOG2(&lc, D, "Stating target %s", target);
|
590 |
return glfs_stat(glfs, target, buf);
|
591 |
} |
592 |
|
593 |
int do_block_delete(struct peer_req *pr, char *target) |
594 |
{ |
595 |
struct peerd *peer = pr->peer;
|
596 |
struct glusterd *gluster = __get_gluster(peer);
|
597 |
glfs_t *glfs = __get_glfs(gluster); |
598 |
|
599 |
XSEGLOG2(&lc, D, "Deleting target %s", target);
|
600 |
return glfs_unlink(glfs, target);
|
601 |
} |
602 |
|
603 |
int handle_delete(struct peerd *peer, struct peer_req *pr) |
604 |
{ |
605 |
struct gluster_io *gio = (struct gluster_io *) pr->priv; |
606 |
struct xseg_request *req = pr->req;
|
607 |
int r;
|
608 |
|
609 |
XSEGLOG2(&lc, D, "Started for pr %p, req %p, target %s",
|
610 |
pr, req, gio->obj_name); |
611 |
|
612 |
r = do_block_delete(pr, gio->obj_name); |
613 |
if (r < 0) { |
614 |
XSEGLOG2(&lc, E, "Deletion of %s failed", gio->obj_name);
|
615 |
fail(peer, pr); |
616 |
} else {
|
617 |
XSEGLOG2(&lc, I, "Deletion of %s completed", gio->obj_name);
|
618 |
complete(peer, pr); |
619 |
} |
620 |
return 0; |
621 |
} |
622 |
|
623 |
int handle_info(struct peerd *peer, struct peer_req *pr) |
624 |
{ |
625 |
struct gluster_io *gio = (struct gluster_io *) pr->priv; |
626 |
struct xseg_request *req = pr->req;
|
627 |
struct xseg_reply_info *xinfo;
|
628 |
struct stat stat;
|
629 |
char *req_data;
|
630 |
char buf[XSEG_MAX_TARGETLEN + 1]; |
631 |
char *target = xseg_get_target(peer->xseg, req);
|
632 |
int r;
|
633 |
|
634 |
XSEGLOG2(&lc, D, "Started for pr %p, req %p, target %s",
|
635 |
pr, req, gio->obj_name); |
636 |
|
637 |
if (req->datalen < sizeof(struct xseg_reply_info)) { |
638 |
/* FIXME: Is this normal? */
|
639 |
strncpy(buf, target, req->targetlen); |
640 |
r = xseg_resize_request(peer->xseg, req, req->targetlen, |
641 |
sizeof(struct xseg_reply_info)); |
642 |
if (r < 0) { |
643 |
XSEGLOG2(&lc, E, "Cannot resize request");
|
644 |
fail(peer, pr); |
645 |
return -1; |
646 |
} |
647 |
target = xseg_get_target(peer->xseg, req); |
648 |
strncpy(target, buf, req->targetlen); |
649 |
} |
650 |
|
651 |
r = do_block_stat(pr, gio->obj_name, &stat); |
652 |
if (r < 0) { |
653 |
XSEGLOG2(&lc, E, "Stat failed for %s", gio->obj_name);
|
654 |
fail(peer, pr); |
655 |
return -1; |
656 |
} |
657 |
|
658 |
req_data = xseg_get_data(peer->xseg, pr->req); |
659 |
xinfo = (struct xseg_reply_info *)req_data;
|
660 |
xinfo->size = (uint64_t)stat.st_size; |
661 |
|
662 |
XSEGLOG2(&lc, I, "Getting info of %s completed", gio->obj_name);
|
663 |
complete(peer, pr); |
664 |
return 0; |
665 |
} |
666 |
|
667 |
void handle_ping(struct peerd *peer, struct peer_req *pr) |
668 |
{ |
669 |
XSEGLOG2(&lc, D, "Ping accepted. Acknowledging...");
|
670 |
|
671 |
complete(peer, pr); |
672 |
} |
673 |
|
674 |
int handle_read(struct peerd *peer, struct peer_req *pr) |
675 |
{ |
676 |
struct gluster_io *gio = (struct gluster_io *) (pr->priv); |
677 |
struct xseg_request *req = pr->req;
|
678 |
glfs_fd_t *fd; |
679 |
char *data = xseg_get_data(peer->xseg, pr->req);
|
680 |
char *target = gio->obj_name;
|
681 |
int ret = REQ_UNDEFINED;
|
682 |
|
683 |
XSEGLOG2(&lc, D, "Started for pr %p, req %p, target %s",
|
684 |
pr, req, target); |
685 |
|
686 |
if (gio->state == ACCEPTED) {
|
687 |
if (req->datalen < req->size) {
|
688 |
XSEGLOG2(&lc, E, "Request datalen is less than "
|
689 |
"request size");
|
690 |
ret = REQ_FAILED; |
691 |
goto out;
|
692 |
} |
693 |
|
694 |
if (!req->size) {
|
695 |
ret = REQ_COMPLETED; |
696 |
goto out;
|
697 |
} |
698 |
|
699 |
fd = do_block_open(pr, target, 0);
|
700 |
if (!fd) {
|
701 |
XSEGLOG2(&lc, I, "Object %s does not exist. "
|
702 |
"Serving zero data\n", target);
|
703 |
/* object not found. return zeros instead */
|
704 |
memset(data, 0, req->size);
|
705 |
req->serviced = req->size; |
706 |
ret = REQ_COMPLETED; |
707 |
goto out;
|
708 |
} |
709 |
gio->fd = fd; |
710 |
|
711 |
XSEGLOG2(&lc, I, "Reading %s", target);
|
712 |
|
713 |
gio->state = READING; |
714 |
ret = begin_aio_read(pr, data, req->size, req->offset); |
715 |
} else if (gio->state == READING) { |
716 |
XSEGLOG2(&lc, I, "Reading of %s callback", target);
|
717 |
ret = complete_aio_read(pr, data, req->size, |
718 |
req->offset, &req->serviced); |
719 |
} |
720 |
|
721 |
out:
|
722 |
switch (ret) {
|
723 |
case REQ_FAILED:
|
724 |
XSEGLOG2(&lc, E, "Reading of %s failed", target);
|
725 |
do_block_close(pr); |
726 |
fail(peer, pr); |
727 |
break;
|
728 |
case REQ_SUBMITTED:
|
729 |
XSEGLOG2(&lc, I, "Reading of %s submitted", target);
|
730 |
submit_hook(pr); |
731 |
break;
|
732 |
case REQ_COMPLETED:
|
733 |
XSEGLOG2(&lc, I, "Reading of %s completed", target);
|
734 |
do_block_close(pr); |
735 |
complete(peer, pr); |
736 |
break;
|
737 |
default:
|
738 |
XSEGLOG2(&lc, E, "Unknown request state. Failing.");
|
739 |
do_block_close(pr); |
740 |
fail(peer, pr); |
741 |
break;
|
742 |
} |
743 |
return 0; |
744 |
} |
745 |
|
746 |
int handle_write(struct peerd *peer, struct peer_req *pr) |
747 |
{ |
748 |
struct gluster_io *gio = (struct gluster_io *) (pr->priv); |
749 |
struct xseg_request *req = pr->req;
|
750 |
glfs_fd_t *fd; |
751 |
char *data = xseg_get_data(peer->xseg, pr->req);
|
752 |
char *target = gio->obj_name;
|
753 |
int ret = REQ_UNDEFINED;
|
754 |
|
755 |
XSEGLOG2(&lc, D, "Started for pr %p, req %p, target %s",
|
756 |
pr, req, target); |
757 |
|
758 |
if (gio->state == ACCEPTED) {
|
759 |
if (req->datalen < req->size) {
|
760 |
XSEGLOG2(&lc, E, "Request datalen is less than "
|
761 |
"request size");
|
762 |
ret = REQ_FAILED; |
763 |
goto out;
|
764 |
} |
765 |
|
766 |
if (!req->size) {
|
767 |
/* TODO: Flush data if req->flags & XF_FLUSH */
|
768 |
ret = REQ_COMPLETED; |
769 |
goto out;
|
770 |
} |
771 |
|
772 |
fd = do_block_open(pr, target, O_CREAT); |
773 |
if (!fd) {
|
774 |
XSEGLOG2(&lc, E, "Cannot open/create %s", target);
|
775 |
ret = REQ_FAILED; |
776 |
goto out;
|
777 |
} |
778 |
gio->fd = fd; |
779 |
|
780 |
XSEGLOG2(&lc, I, "Writing %s", target);
|
781 |
|
782 |
gio->state = WRITING; |
783 |
ret = begin_aio_write(pr, data, req->size, req->offset); |
784 |
} else if (gio->state == WRITING) { |
785 |
XSEGLOG2(&lc, I, "Writing of %s callback", target);
|
786 |
ret = complete_aio_write(pr, data, req->size, |
787 |
req->offset, &req->serviced); |
788 |
} |
789 |
|
790 |
out:
|
791 |
switch (ret) {
|
792 |
case REQ_FAILED:
|
793 |
XSEGLOG2(&lc, E, "Writing of %s failed", target);
|
794 |
do_block_close(pr); |
795 |
fail(peer, pr); |
796 |
break;
|
797 |
case REQ_SUBMITTED:
|
798 |
XSEGLOG2(&lc, I, "Writing of %s submitted", target);
|
799 |
submit_hook(pr); |
800 |
break;
|
801 |
case REQ_COMPLETED:
|
802 |
XSEGLOG2(&lc, I, "Writing of %s completed", target);
|
803 |
do_block_close(pr); |
804 |
complete(peer, pr); |
805 |
break;
|
806 |
default:
|
807 |
XSEGLOG2(&lc, E, "Unknown request state. Failing.");
|
808 |
do_block_close(pr); |
809 |
fail(peer, pr); |
810 |
break;
|
811 |
} |
812 |
return 0; |
813 |
} |
814 |
|
815 |
int handle_copy(struct peerd *peer, struct peer_req *pr) |
816 |
{ |
817 |
struct gluster_io *gio = (struct gluster_io *) (pr->priv); |
818 |
struct xseg_request *req = pr->req;
|
819 |
glfs_fd_t *fd; |
820 |
char *target = gio->obj_name;
|
821 |
int r;
|
822 |
int ret = REQ_UNDEFINED;
|
823 |
|
824 |
XSEGLOG2(&lc, D, "Started for pr %p, req %p, target %s",
|
825 |
pr, req, target); |
826 |
|
827 |
if (gio->state == ACCEPTED) {
|
828 |
/* Create second name and buf */
|
829 |
r = prepare_copy(pr); |
830 |
if (r < 0) { |
831 |
ret = REQ_FAILED; |
832 |
goto out;
|
833 |
} |
834 |
target = gio->second_name; |
835 |
|
836 |
XSEGLOG2(&lc, I, "Copy of object %s to object %s started",
|
837 |
gio->second_name, gio->obj_name); |
838 |
|
839 |
if (!req->size) {
|
840 |
ret = REQ_COMPLETED; |
841 |
goto out;
|
842 |
} |
843 |
|
844 |
/* FIXME: Will we fail here? */
|
845 |
fd = do_block_open(pr, target, 0);
|
846 |
if (!fd) {
|
847 |
XSEGLOG2(&lc, I, "Object %s does not exist. "
|
848 |
"Serving zero data\n", target);
|
849 |
/* object not found. return zeros instead */
|
850 |
memset(gio->buf, 0, req->size);
|
851 |
goto write;
|
852 |
} |
853 |
gio->fd = fd; |
854 |
|
855 |
XSEGLOG2(&lc, I, "Reading %s", target);
|
856 |
|
857 |
gio->state = READING; |
858 |
gio->read = 0;
|
859 |
ret = begin_aio_read(pr, gio->buf, req->size, req->offset); |
860 |
} |
861 |
else if (gio->state == READING){ |
862 |
target = gio->second_name; |
863 |
XSEGLOG2(&lc, I, "Reading of %s callback", target);
|
864 |
|
865 |
ret = complete_aio_read(pr, gio->buf, req->size, |
866 |
req->offset, &gio->read); |
867 |
|
868 |
if (ret == REQ_FAILED || ret == REQ_SUBMITTED)
|
869 |
goto out;
|
870 |
|
871 |
do_block_close(pr); |
872 |
write:
|
873 |
target = gio->obj_name; |
874 |
XSEGLOG2(&lc, I, "Target is %s", target);
|
875 |
fd = do_block_open(pr, target, O_CREAT); |
876 |
if (!fd) {
|
877 |
XSEGLOG2(&lc, E, "Cannot open/create %s", target);
|
878 |
ret = REQ_FAILED; |
879 |
goto out;
|
880 |
} |
881 |
gio->fd = fd; |
882 |
|
883 |
XSEGLOG2(&lc, I, "Writing %s", target);
|
884 |
|
885 |
gio->state = WRITING; |
886 |
ret = begin_aio_write(pr, gio->buf, req->size, req->offset); |
887 |
} else if (gio->state == WRITING) { |
888 |
XSEGLOG2(&lc, I, "Writing of %s callback", target);
|
889 |
ret = complete_aio_write(pr, gio->buf, req->size, |
890 |
req->offset, &req->serviced); |
891 |
} |
892 |
|
893 |
out:
|
894 |
if (ret != REQ_SUBMITTED) {
|
895 |
free(gio->buf); |
896 |
free(gio->second_name); |
897 |
gio->buf = NULL;
|
898 |
gio->second_name = NULL;
|
899 |
gio->read = 0;
|
900 |
} |
901 |
|
902 |
switch (ret) {
|
903 |
case REQ_FAILED:
|
904 |
XSEGLOG2(&lc, E, "Copying of %s failed", target);
|
905 |
do_block_close(pr); |
906 |
fail(peer, pr); |
907 |
break;
|
908 |
case REQ_SUBMITTED:
|
909 |
submit_hook(pr); |
910 |
break;
|
911 |
case REQ_COMPLETED:
|
912 |
XSEGLOG2(&lc, I, "Copying of %s completed", target);
|
913 |
do_block_close(pr); |
914 |
complete(peer, pr); |
915 |
break;
|
916 |
default:
|
917 |
XSEGLOG2(&lc, E, "Unknown request state. Failing.");
|
918 |
do_block_close(pr); |
919 |
fail(peer, pr); |
920 |
break;
|
921 |
} |
922 |
return 0; |
923 |
} |
924 |
|
925 |
int handle_hash(struct peerd *peer, struct peer_req *pr) |
926 |
{ |
927 |
struct xseg_request *req = pr->req;
|
928 |
struct gluster_io *gio = (struct gluster_io *) pr->priv; |
929 |
struct xseg_reply_hash *xreply;
|
930 |
glfs_fd_t *fd; |
931 |
uint64_t trailing_zeros = 0;
|
932 |
unsigned char sha[SHA256_DIGEST_SIZE]; |
933 |
char hash_name[HEXLIFIED_SHA256_DIGEST_SIZE + 1]; |
934 |
uint32_t pos; |
935 |
char *target = gio->obj_name;
|
936 |
int r;
|
937 |
int ret = REQ_UNDEFINED;
|
938 |
|
939 |
XSEGLOG2(&lc, D, "Started for pr %p, req %p, target %s",
|
940 |
pr, req, target); |
941 |
|
942 |
if (gio->state == ACCEPTED){
|
943 |
XSEGLOG2(&lc, I, "Starting hashing of object %s", target);
|
944 |
|
945 |
if (!req->size) {
|
946 |
ret = REQ_COMPLETED; /* or fail? */
|
947 |
goto out;
|
948 |
} |
949 |
|
950 |
/* Create hash_name, gio second_name and gio->buf */
|
951 |
r = prepare_hash(pr, hash_name); |
952 |
if (r < 0) { |
953 |
ret = REQ_FAILED; |
954 |
goto out;
|
955 |
} |
956 |
target = hash_name; |
957 |
gio->state = PREHASHING; |
958 |
|
959 |
/* Get correct status */
|
960 |
fd = do_block_open(pr, target, 0);
|
961 |
if (!fd) {
|
962 |
XSEGLOG2(&lc, I, "Hash %s does not exist.", target);
|
963 |
goto read;
|
964 |
} |
965 |
gio->fd = fd; |
966 |
|
967 |
XSEGLOG2(&lc, I, "Reading %s", target);
|
968 |
/* Read contents of hash_name in the gio->second_name buffer */
|
969 |
ret = begin_aio_read(pr, gio->second_name, |
970 |
HEXLIFIED_SHA256_DIGEST_SIZE, req->offset); |
971 |
} else if (gio->state == PREHASHING) { |
972 |
target = hash_name; |
973 |
XSEGLOG2(&lc, I, "Reading of %s callback", target);
|
974 |
ret = complete_aio_read(pr, gio->second_name, |
975 |
HEXLIFIED_SHA256_DIGEST_SIZE, |
976 |
req->offset, &gio->read); |
977 |
|
978 |
if (ret == REQ_FAILED || ret == REQ_SUBMITTED)
|
979 |
goto out;
|
980 |
|
981 |
/* Construct answer */
|
982 |
XSEGLOG2(&lc, D, "Precalculated hash found");
|
983 |
xreply = (struct xseg_reply_hash*)xseg_get_data(peer->xseg, req);
|
984 |
r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, |
985 |
sizeof(struct xseg_reply_hash)); |
986 |
strncpy(xreply->target, gio->second_name, HEXLIFIED_SHA256_DIGEST_SIZE); |
987 |
xreply->targetlen = HEXLIFIED_SHA256_DIGEST_SIZE; |
988 |
|
989 |
XSEGLOG2(&lc, I, "Calculated %s as hash of %s",
|
990 |
gio->second_name, gio->obj_name); |
991 |
req->serviced = req->size; |
992 |
goto out;
|
993 |
/* Leave */
|
994 |
|
995 |
read:
|
996 |
/*
|
997 |
* We reach this point only if there was no precalculated hash
|
998 |
*/
|
999 |
gio->state = READING; |
1000 |
gio->read = 0;
|
1001 |
target = gio->obj_name; |
1002 |
|
1003 |
fd = do_block_open(pr, target, 0);
|
1004 |
if (!fd) {
|
1005 |
XSEGLOG2(&lc, I, "Original object %s does not "
|
1006 |
"exist.\nServing zeroes.", target);
|
1007 |
memset(gio->buf, 0, req->size);
|
1008 |
gio->read = req->size; |
1009 |
goto hash;
|
1010 |
} |
1011 |
gio->fd = fd; |
1012 |
|
1013 |
XSEGLOG2(&lc, I, "Reading %s", target);
|
1014 |
ret = begin_aio_read(pr, gio->buf, req->size, req->offset); |
1015 |
} else if (gio->state == READING) { |
1016 |
target = gio->obj_name; |
1017 |
XSEGLOG2(&lc, I, "Reading of %s callback", target);
|
1018 |
ret = complete_aio_read(pr, gio->buf, req->size, |
1019 |
req->offset, &gio->read); |
1020 |
|
1021 |
if (ret == REQ_FAILED || ret == REQ_SUBMITTED)
|
1022 |
goto out;
|
1023 |
|
1024 |
do_block_close(pr); |
1025 |
|
1026 |
hash:
|
1027 |
/* Strip here trailing zeroes */
|
1028 |
for (; trailing_zeros < gio->read; trailing_zeros++) {
|
1029 |
if (gio->buf[gio->read-trailing_zeros -1]) |
1030 |
break;
|
1031 |
} |
1032 |
XSEGLOG2(&lc, D, "Read %llu, Trailing zeros %llu",
|
1033 |
gio->read, trailing_zeros); |
1034 |
|
1035 |
gio->read -= trailing_zeros; |
1036 |
SHA256((unsigned char *) gio->buf, gio->read, sha); |
1037 |
hexlify(sha, SHA256_DIGEST_SIZE, gio->second_name); |
1038 |
gio->second_name[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
|
1039 |
|
1040 |
/* Construct reply */
|
1041 |
xreply = (struct xseg_reply_hash*)xseg_get_data(peer->xseg, req);
|
1042 |
r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, |
1043 |
sizeof(struct xseg_reply_hash)); |
1044 |
strncpy(xreply->target, gio->second_name, HEXLIFIED_SHA256_DIGEST_SIZE); |
1045 |
xreply->targetlen = HEXLIFIED_SHA256_DIGEST_SIZE; |
1046 |
|
1047 |
XSEGLOG2(&lc, I, "Calculated %s as hash of %s",
|
1048 |
gio->second_name, gio->obj_name); |
1049 |
/*
|
1050 |
* We can't leave since we need to write the
|
1051 |
* content-addressable object to the backend.
|
1052 |
*/
|
1053 |
|
1054 |
gio->read = 0;
|
1055 |
fd = do_block_create(pr, gio->second_name, O_EXCL); |
1056 |
if (!fd) {
|
1057 |
XSEGLOG2(&lc, I, "Hash of object %s to object %s completed",
|
1058 |
gio->obj_name, gio->second_name); |
1059 |
req->serviced = req->size; |
1060 |
goto write;
|
1061 |
} |
1062 |
gio->fd = fd; |
1063 |
target = gio->second_name; |
1064 |
|
1065 |
XSEGLOG2(&lc, I, "Writing %s", target);
|
1066 |
|
1067 |
gio->state = WRITING; |
1068 |
ret = begin_aio_write(pr, gio->buf, req->size, req->offset); |
1069 |
} else if (gio->state == WRITING) { |
1070 |
target = gio->second_name; |
1071 |
XSEGLOG2(&lc, I, "Writing of %s callback", target);
|
1072 |
ret = complete_aio_write(pr, gio->buf, req->size, req->offset, |
1073 |
&req->serviced); |
1074 |
|
1075 |
if (ret == REQ_FAILED || ret == REQ_SUBMITTED)
|
1076 |
goto out;
|
1077 |
|
1078 |
XSEGLOG2(&lc, I, "Writing of %s completed", gio->second_name);
|
1079 |
|
1080 |
/*
|
1081 |
* We can't leave since we need to write the precalculated hash
|
1082 |
* value to the backend.
|
1083 |
*/
|
1084 |
do_block_close(pr); |
1085 |
write:
|
1086 |
pos = 0;
|
1087 |
strncpy(hash_name, gio->obj_name, strlen(gio->obj_name)); |
1088 |
pos += strlen(gio->obj_name); |
1089 |
strncpy(hash_name+pos, HASH_SUFFIX, HASH_SUFFIX_LEN); |
1090 |
pos += HASH_SUFFIX_LEN; |
1091 |
hash_name[pos] = 0;
|
1092 |
|
1093 |
gio->state = POSTHASHING; |
1094 |
fd = do_block_create(pr, hash_name, O_EXCL); |
1095 |
if (!fd) {
|
1096 |
XSEGLOG2(&lc, I, "Writing of prehashed value completed");
|
1097 |
XSEGLOG2(&lc, I, "Hash of object %s to object %s completed",
|
1098 |
gio->obj_name, gio->second_name); |
1099 |
req->serviced = req->size; |
1100 |
ret = REQ_COMPLETED; |
1101 |
/* TODO: Check errors */
|
1102 |
goto out;
|
1103 |
} |
1104 |
gio->fd = fd; |
1105 |
target = hash_name; |
1106 |
|
1107 |
XSEGLOG2(&lc, I, "Writing prehashed value");
|
1108 |
ret = begin_aio_write(pr, gio->second_name, |
1109 |
HEXLIFIED_SHA256_DIGEST_SIZE, 0);
|
1110 |
} else if (gio->state == POSTHASHING) { |
1111 |
XSEGLOG2(&lc, I, "Writing of prehashed value callback");
|
1112 |
ret = complete_aio_write(pr, gio->second_name, |
1113 |
HEXLIFIED_SHA256_DIGEST_SIZE, 0, &gio->read);
|
1114 |
|
1115 |
if (ret == REQ_FAILED || ret == REQ_SUBMITTED)
|
1116 |
goto out;
|
1117 |
|
1118 |
XSEGLOG2(&lc, I, "Writing of prehashed value completed");
|
1119 |
req->serviced = req->size; |
1120 |
} |
1121 |
|
1122 |
out:
|
1123 |
target = gio->obj_name; |
1124 |
if (ret != REQ_SUBMITTED) {
|
1125 |
free(gio->buf); |
1126 |
free(gio->second_name); |
1127 |
gio->buf = NULL;
|
1128 |
gio->second_name = NULL;
|
1129 |
gio->read = 0;
|
1130 |
} |
1131 |
|
1132 |
switch (ret) {
|
1133 |
case REQ_FAILED:
|
1134 |
XSEGLOG2(&lc, E, "Hashing of %s failed", target);
|
1135 |
do_block_close(pr); |
1136 |
fail(peer, pr); |
1137 |
break;
|
1138 |
case REQ_SUBMITTED:
|
1139 |
submit_hook(pr); |
1140 |
break;
|
1141 |
case REQ_COMPLETED:
|
1142 |
XSEGLOG2(&lc, I, "Hashing of %s completed", target);
|
1143 |
do_block_close(pr); |
1144 |
complete(peer, pr); |
1145 |
break;
|
1146 |
default:
|
1147 |
XSEGLOG2(&lc, E, "Unknown request state. Failing.");
|
1148 |
do_block_close(pr); |
1149 |
fail(peer, pr); |
1150 |
break;
|
1151 |
} |
1152 |
return 0; |
1153 |
} |
1154 |
|
1155 |
int handle_acquire(struct peerd *peer, struct peer_req *pr) |
1156 |
{ |
1157 |
struct gluster_io *gio = (struct gluster_io *)(pr->priv); |
1158 |
struct xseg_request *req = pr->req;
|
1159 |
uint32_t len = strlen(gio->obj_name); |
1160 |
int ret;
|
1161 |
|
1162 |
strncpy(gio->obj_name + len, LOCK_SUFFIX, LOCK_SUFFIX_LEN); |
1163 |
gio->obj_name[len + LOCK_SUFFIX_LEN] = 0;
|
1164 |
|
1165 |
XSEGLOG2(&lc, I, "Starting lock op for %s", gio->obj_name);
|
1166 |
|
1167 |
/* TODO: Check error codes and retry if needed */
|
1168 |
ret = do_block_lock(pr, gio->obj_name, req->flags & XF_NOSYNC); |
1169 |
if (ret < 0) { |
1170 |
XSEGLOG2(&lc, E, "Lock op failed for %s", gio->obj_name);
|
1171 |
fail(peer, pr); |
1172 |
} else {
|
1173 |
XSEGLOG2(&lc, I, "Lock op succeeded for %s", gio->obj_name);
|
1174 |
complete(peer, pr); |
1175 |
} |
1176 |
return 0; |
1177 |
} |
1178 |
|
1179 |
|
1180 |
int handle_release(struct peerd *peer, struct peer_req *pr) |
1181 |
{ |
1182 |
struct gluster_io *gio = (struct gluster_io *)(pr->priv); |
1183 |
struct xseg_request *req = pr->req;
|
1184 |
uint32_t len = strlen(gio->obj_name); |
1185 |
int ret;
|
1186 |
|
1187 |
strncpy(gio->obj_name + len, LOCK_SUFFIX, LOCK_SUFFIX_LEN); |
1188 |
gio->obj_name[len + LOCK_SUFFIX_LEN] = 0;
|
1189 |
|
1190 |
XSEGLOG2(&lc, I, "Starting unlock op for %s", gio->obj_name);
|
1191 |
|
1192 |
/* TODO: Check error codes and retry if needed */
|
1193 |
ret = do_block_unlock(pr, gio->obj_name, req->flags & XF_FORCE); |
1194 |
if (ret < 0) { |
1195 |
XSEGLOG2(&lc, E, "Unlock op failed for %s", gio->obj_name);
|
1196 |
fail(peer, pr); |
1197 |
} else {
|
1198 |
XSEGLOG2(&lc, I, "Unlock op succeeded for %s", gio->obj_name);
|
1199 |
complete(peer, pr); |
1200 |
} |
1201 |
return 0; |
1202 |
} |
1203 |
|
1204 |
int custom_peer_init(struct peerd *peer, int argc, char *argv[]) |
1205 |
{ |
1206 |
struct glusterd *gluster = malloc(sizeof(struct glusterd)); |
1207 |
struct gluster_io *gio;
|
1208 |
glfs_t *glfs = NULL;
|
1209 |
char transport[MAX_GLFS_ARG_LEN + 1]; |
1210 |
char server[MAX_GLFS_ARG_LEN + 1]; |
1211 |
char volume[MAX_GLFS_ARG_LEN + 1]; |
1212 |
char uniquestr[MAX_UNIQUESTR_LEN + 1]; |
1213 |
int tid;
|
1214 |
int port = 0; |
1215 |
int ret = 0; |
1216 |
int i = 0; |
1217 |
int j = 0; |
1218 |
|
1219 |
if (!gluster) {
|
1220 |
perror("malloc");
|
1221 |
return -1; |
1222 |
} |
1223 |
gluster->async = 0;
|
1224 |
|
1225 |
uniquestr[0] = 0; |
1226 |
transport[0] = 0; |
1227 |
server[0] = 0; |
1228 |
volume[0] = 0; |
1229 |
|
1230 |
BEGIN_READ_ARGS(argc, argv); |
1231 |
READ_ARG_STRING("--transport", transport, MAX_GLFS_ARG_LEN);
|
1232 |
READ_ARG_STRING("--server", server, MAX_GLFS_ARG_LEN);
|
1233 |
READ_ARG_ULONG("--port", port);
|
1234 |
READ_ARG_STRING("--volume", volume, MAX_GLFS_ARG_LEN);
|
1235 |
READ_ARG_STRING("--uniquestr", gluster->uniquestr, MAX_UNIQUESTR_LEN);
|
1236 |
READ_ARG_BOOL("--async", gluster->async);
|
1237 |
END_READ_ARGS(); |
1238 |
|
1239 |
|
1240 |
if (!volume[0]){ |
1241 |
XSEGLOG2(&lc, E , "Volume must be provided");
|
1242 |
usage(argv[0]);
|
1243 |
goto err_arg;
|
1244 |
} |
1245 |
|
1246 |
/* Use defaults if user has not provided his/her own */
|
1247 |
if (!transport[0]) |
1248 |
strncpy(transport, "tcp", 4); |
1249 |
|
1250 |
if (!server[0]) |
1251 |
strncpy(server, "127.0.0.1", 10); |
1252 |
|
1253 |
if (!uniquestr[0]) { |
1254 |
tid = getpid(); |
1255 |
snprintf(gluster->uniquestr, MAX_UNIQUESTR_LEN, "%d", tid);
|
1256 |
XSEGLOG2(&lc, W, "Warning, uniquestr not provided.\n"
|
1257 |
"\tUsing instead \"%s\" but uniqueness is not "
|
1258 |
"guaranteed.", gluster->uniquestr);
|
1259 |
} else {
|
1260 |
strncpy(gluster->uniquestr, uniquestr, MAX_UNIQUESTR_LEN); |
1261 |
} |
1262 |
|
1263 |
ret = __set_glfs(gluster, volume); |
1264 |
if (ret < 0) { |
1265 |
XSEGLOG2(&lc, E, "Error at glfs_new");
|
1266 |
goto err_glfs;
|
1267 |
} |
1268 |
glfs = __get_glfs(gluster); |
1269 |
|
1270 |
ret = glfs_set_volfile_server(glfs, transport, server, port); |
1271 |
if (ret < 0) { |
1272 |
XSEGLOG2(&lc, E, "Error at glfs_set_volfile_server");
|
1273 |
goto err_glfs;
|
1274 |
} |
1275 |
|
1276 |
ret = glfs_init(glfs); |
1277 |
if (ret) {
|
1278 |
XSEGLOG2(&lc, E, "Error at glfs_init\n");
|
1279 |
goto err_glfs;
|
1280 |
} |
1281 |
|
1282 |
peer->priv = (void *)gluster;
|
1283 |
for (i = 0; i < peer->nr_ops; i++) { |
1284 |
gio = malloc(sizeof(struct gluster_io)); |
1285 |
|
1286 |
if (!gio)
|
1287 |
goto err_gio;
|
1288 |
|
1289 |
gio->fd = NULL;
|
1290 |
gio->buf = 0;
|
1291 |
gio->read = 0;
|
1292 |
gio->size = 0;
|
1293 |
gio->second_name = 0;
|
1294 |
peer->peer_reqs[i].priv = (void *) gio;
|
1295 |
} |
1296 |
return 0; |
1297 |
|
1298 |
err_arg:
|
1299 |
free(gluster); |
1300 |
err_gio:
|
1301 |
for (j = 0; j < i; j++) |
1302 |
free(peer->peer_reqs[j].priv); |
1303 |
err_glfs:
|
1304 |
glfs_fini(glfs); |
1305 |
return -1; |
1306 |
} |
1307 |
|
1308 |
// nothing to do here for now
|
1309 |
int custom_arg_parse(int argc, const char *argv[]) |
1310 |
{ |
1311 |
return 0; |
1312 |
} |
1313 |
|
1314 |
void custom_peer_finalize(struct peerd *peer) |
1315 |
{ |
1316 |
struct glusterd *gluster = __get_gluster(peer);
|
1317 |
glfs_t *glfs = __get_glfs(gluster); |
1318 |
|
1319 |
glfs_fini(glfs); |
1320 |
XSEGLOG2(&lc, I, "Glusterd has closed successfully");
|
1321 |
} |
1322 |
|
1323 |
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req, |
1324 |
enum dispatch_reason reason)
|
1325 |
{ |
1326 |
struct gluster_io *gio = (struct gluster_io *) (pr->priv); |
1327 |
char *target = xseg_get_target(peer->xseg, pr->req);
|
1328 |
unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME) ? |
1329 |
MAX_OBJ_NAME : pr->req->targetlen; |
1330 |
|
1331 |
if (reason == dispatch_accept) {
|
1332 |
strncpy(gio->obj_name, target, end); |
1333 |
gio->obj_name[end] = 0;
|
1334 |
gio->state = ACCEPTED; |
1335 |
gio->fd = NULL;
|
1336 |
gio->read = 0;
|
1337 |
} |
1338 |
|
1339 |
switch (pr->req->op){
|
1340 |
case X_READ:
|
1341 |
handle_read(peer, pr); |
1342 |
break;
|
1343 |
case X_WRITE:
|
1344 |
handle_write(peer, pr); |
1345 |
break;
|
1346 |
case X_DELETE:
|
1347 |
if (canDefer(peer))
|
1348 |
defer_request(peer, pr); |
1349 |
else
|
1350 |
handle_delete(peer, pr); |
1351 |
break;
|
1352 |
case X_INFO:
|
1353 |
if (canDefer(peer))
|
1354 |
defer_request(peer, pr); |
1355 |
else
|
1356 |
handle_info(peer, pr); |
1357 |
break;
|
1358 |
case X_COPY:
|
1359 |
if (canDefer(peer))
|
1360 |
defer_request(peer, pr); |
1361 |
else
|
1362 |
handle_copy(peer, pr); |
1363 |
break;
|
1364 |
case X_ACQUIRE:
|
1365 |
handle_acquire(peer, pr); |
1366 |
break;
|
1367 |
case X_RELEASE:
|
1368 |
handle_release(peer, pr); |
1369 |
break;
|
1370 |
case X_HASH:
|
1371 |
handle_hash(peer, pr); |
1372 |
break;
|
1373 |
case X_PING:
|
1374 |
handle_ping(peer, pr); |
1375 |
break;
|
1376 |
default:
|
1377 |
fail(peer, pr); |
1378 |
} |
1379 |
return 0; |
1380 |
} |