13 #include <xseg/xseg.h>
15 #define TARGET_NAMELEN 128
17 static int usage(void)
19 printf("Usage: ./blockd <path_to_disk_image> [options]\n"
20 "Options: [-p portno]\n"
21 " [-s image size in bytes]\n"
22 " [-g type:name:nr_ports:nr_requests:request_size:extra_size:page_shift]\n"
23 " [-n nr_parallel_ops]\n");
29 struct xseg_request *req;
35 struct xseg_port *xport;
38 char name[TARGET_NAMELEN];
44 struct xq pending_ops;
47 struct sigevent sigevent;
50 static unsigned long sigaction_count;
52 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
57 static struct io *alloc_io(struct store *store)
59 xqindex idx = xq_pop_head(&store->free_ops);
62 return store->ios + idx;
65 static inline void free_io(struct store *store, struct io *io)
67 xqindex idx = io - store->ios;
69 xq_append_head(&store->free_ops, idx);
72 static inline void pending_io(struct store *store, struct io *io)
74 xqindex idx = io - store->ios;
75 xq_append_tail(&store->pending_ops, idx);
78 static inline struct io *get_pending_io(struct store *store)
80 xqindex idx = xq_pop_head(&store->pending_ops);
83 return store->ios + idx;
86 static void log_io(char *msg, struct io *io)
88 char name[64], data[64];
89 /* null terminate name in case of req->name is less than 63 characters,
90 * and next character after name (aka first byte of next buffer) is not
93 unsigned int end = (io->req->namesize > 63) ? 63 : io->req->namesize;
94 strncpy(name, io->req->name, end);
96 strncpy(data, io->req->data, 63);
98 printf("%s: fd:%u, op:%u %llu:%lu retval: %lu, reqstate: %u\n"
99 "name[%u]:'%s', data[%llu]:\n%s------------------\n\n",
101 (unsigned int)io->cb.aio_fildes,
102 (unsigned int)io->req->op,
103 (unsigned long long)io->cb.aio_offset,
104 (unsigned long)io->cb.aio_nbytes,
105 (unsigned long)io->retval,
106 (unsigned int)io->req->state,
107 (unsigned int)io->req->namesize, name,
108 (unsigned long long)io->req->datasize, data);
111 static void complete(struct store *store, struct io *io)
113 struct xseg_request *req = io->req;
114 req->state |= XS_SERVED;
115 log_io("complete", io);
116 xseg_respond(store->xseg, req->portno, req);
117 xseg_signal(store->xseg, req->portno);
121 static void fail(struct store *store, struct io *io)
123 struct xseg_request *req = io->req;
124 req->state |= XS_FAILED;
126 xseg_respond(store->xseg, req->portno, req);
127 xseg_signal(store->xseg, req->portno);
131 static void pending(struct store *store, struct io *io)
133 io->req->state = XS_PENDING;
134 pending_io(store, io);
137 static void handle_unknown(struct store *store, struct io *io)
139 struct xseg_request *req = io->req;
140 snprintf(req->data, req->datasize, "unknown request op");
144 static inline void prepare_io(struct store *store, struct io *io)
146 io->cb.aio_fildes = store->fd;
147 io->cb.aio_sigevent = store->sigevent;
148 /* cb->aio_sigevent.sigev_value.sival_int = fd; */
151 static void handle_read_write(struct store *store, struct io *io)
154 struct xseg_request *req = io->req;
155 struct aiocb *cb = &io->cb;
157 if (req->state != XS_ACCEPTED) {
159 req->serviced += io->retval;
161 req->datasize = req->serviced;
163 if (req->serviced >= req->datasize) {
170 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
172 if (req->flags & (XF_FLUSH | XF_FUA)) {
173 /* for now, no FLUSH/FUA support.
174 * note that with FLUSH/size == 0
175 * there will probably be a (uint64_t)-1 offset */
184 prepare_io(store, io);
185 cb->aio_buf = req->data + req->serviced;
186 cb->aio_nbytes = req->datasize - req->serviced;
187 cb->aio_offset = req->offset + req->serviced;
197 snprintf(req->data, req->datasize,
198 "wtf, corrupt op %u?\n", req->op);
204 strerror_r(errno, req->data, req->datasize);
212 static void handle_info(struct store *store, struct io *io)
214 struct xseg_request *req = io->req;
216 if (req->namesize != store->namesize ||
217 strncmp(req->name, store->name, store->namesize)) {
223 *((uint64_t *) req->data) = store->size;
224 req->serviced = req->datasize = sizeof(store->size);
225 io->retval = io->cb.aio_offset = io->cb.aio_nbytes = req->datasize;
230 static void dispatch(struct store *store, struct io *io)
232 switch (io->req->op) {
235 handle_read_write(store, io); break;
237 handle_info(store, io); break;
240 handle_unknown(store, io);
244 static void handle_pending(struct store *store, struct io *io)
246 int r = aio_error(&io->cb);
247 if (r == EINPROGRESS) {
252 io->retval = aio_return(&io->cb);
261 static void handle_accepted(struct store *store, struct io *io)
263 struct xseg_request *req = io->req;
265 req->state = XS_ACCEPTED;
270 static int blockd_loop(struct store *store)
272 struct xseg *xseg = store->xseg;
273 uint32_t portno = store->portno;
275 struct xseg_request *accepted;
279 xseg_prepare_wait(xseg, portno);
280 io = alloc_io(store);
282 accepted = xseg_accept(xseg, portno);
284 xseg_cancel_wait(xseg, portno);
286 handle_accepted(store, io);
291 io = get_pending_io(store);
293 xseg_cancel_wait(xseg, portno);
294 handle_pending(store, io);
297 if (!io && !accepted)
298 xseg_wait_signal(xseg, portno, 10000);
304 static struct xseg *join(char *spec)
306 struct xseg_config config;
309 (void)xseg_parse_spec(spec, &config);
310 xseg = xseg_join(config.type, config.name);
314 (void)xseg_create(&config);
315 return xseg_join(config.type, config.name);
318 static int blockd(char *path, off_t size, uint32_t nr_ops,
319 char *spec, long portno)
326 store = malloc(sizeof(struct store));
332 strncpy(store->name, path, TARGET_NAMELEN);
333 store->name[TARGET_NAMELEN - 1] = '\0';
334 store->namesize = strlen(store->name);
336 store->fd = open(path, O_RDWR);
337 while (store->fd < 0) {
338 if (errno == ENOENT && size)
339 store->fd = open(path, O_RDWR | O_CREAT, 0600);
347 r = fstat(store->fd, &stat);
352 size = (uint64_t) stat.st_size;
354 fprintf(stderr, "size cannot be zero\n");
359 lseek(store->fd, size-1, SEEK_SET);
360 if (write(store->fd, &r, 1) != 1) {
373 store->sigevent.sigev_notify = SIGEV_SIGNAL;
374 store->sigevent.sigev_signo = SIGIO;
375 sa.sa_sigaction = sigaction_handler;
376 sa.sa_flags = SA_SIGINFO;
377 if (sigemptyset(&sa.sa_mask))
378 perror("sigemptyset");
380 if (sigaction(SIGIO, &sa, NULL)) {
385 store->nr_ops = nr_ops;
386 store->free_bufs = calloc(nr_ops, sizeof(xqindex));
387 if (!store->free_bufs)
390 store->pending_bufs = calloc(nr_ops, sizeof(xqindex));
391 if (!store->pending_bufs)
394 store->ios = calloc(nr_ops, sizeof(struct io));
401 xq_init_seq(&store->free_ops, nr_ops, nr_ops, store->free_bufs);
402 xq_init_empty(&store->pending_ops, nr_ops, store->pending_bufs);
404 if (xseg_initialize("posix")) {
405 printf("cannot initialize library\n");
408 store->xseg = join(spec);
412 store->xport = xseg_bind_port(store->xseg, portno);
414 printf("cannot bind to port %ld\n", portno);
418 store->portno = xseg_portno(store->xseg, store->xport);
419 printf("blockd on port %u/%u\n",
420 store->portno, store->xseg->config.nr_ports);
422 return blockd_loop(store);
425 int main(int argc, char **argv)
427 char *path, *spec = "";
441 for (i = 2; i < argc; i++) {
442 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
448 if (!strcmp(argv[i], "-s") && i + 1 < argc) {
449 size = strtoull(argv[i+1], NULL, 10);
454 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
455 portno = strtoul(argv[i+1], NULL, 10);
460 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
461 nr_ops = strtoul(argv[i+1], NULL, 10);
470 return blockd(path, size, nr_ops, spec, portno);