13 #include <xseg/xseg.h>
15 static int usage(void)
17 printf("Usage: ./blockd <path_to_disk_image> [options]\n"
18 "Options: [-p portno]\n"
19 " [-s image size in bytes]\n"
20 " [-g type:name:nr_ports:nr_requests:request_size:extra_size:page_shift]\n"
21 " [-n nr_parallel_ops]\n");
27 struct xseg_request *req;
33 struct xseg_port *xport;
40 struct xq pending_ops;
43 struct sigevent sigevent;
46 static unsigned long sigaction_count;
48 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
53 static struct io *alloc_io(struct store *store)
55 xqindex idx = xq_pop_head(&store->free_ops);
58 return store->ios + idx;
61 static inline void free_io(struct store *store, struct io *io)
63 xqindex idx = io - store->ios;
65 xq_append_head(&store->free_ops, idx);
68 static inline void pending_io(struct store *store, struct io *io)
70 xqindex idx = io - store->ios;
71 xq_append_head(&store->pending_ops, idx);
74 static inline struct io *get_pending_io(struct store *store)
76 xqindex idx = xq_pop_head(&store->pending_ops);
79 return store->ios + idx;
82 static void log_io(char *msg, struct io *io)
84 char name[64], data[64];
85 /* null terminate name in case of req->name is less than 63 characters,
86 * and next character after name (aka first byte of next buffer) is not
89 unsigned int end = (io->req->namesize > 63) ? 63 : io->req->namesize;
90 strncpy(name, io->req->name, end);
92 strncpy(data, io->req->data, 63);
94 printf("%s: fd:%u, op:%u %llu:%lu retval: %lu, reqstate: %u\n"
95 "name[%u]:'%s', data[%llu]:\n%s------------------\n\n",
97 (unsigned int)io->cb.aio_fildes,
98 (unsigned int)io->req->op,
99 (unsigned long long)io->cb.aio_offset,
100 (unsigned long)io->cb.aio_nbytes,
101 (unsigned long)io->retval,
102 (unsigned int)io->req->state,
103 (unsigned int)io->req->namesize, name,
104 (unsigned long long)io->req->datasize, data);
107 static void complete(struct store *store, struct io *io)
109 struct xseg_request *req = io->req;
110 req->state |= XS_SERVED;
111 log_io("complete", io);
112 xseg_respond(store->xseg, req->portno, req);
113 xseg_signal(store->xseg, req->portno);
117 static void fail(struct store *store, struct io *io)
119 struct xseg_request *req = io->req;
120 req->state |= XS_ERROR;
122 xseg_respond(store->xseg, req->portno, req);
123 xseg_signal(store->xseg, req->portno);
127 static void pending(struct store *store, struct io *io)
129 io->req->state = XS_PENDING;
130 pending_io(store, io);
133 static void handle_unknown(struct store *store, struct io *io)
135 struct xseg_request *req = io->req;
136 snprintf(req->data, req->datasize, "unknown request op");
140 static inline void prepare_io(struct store *store, struct io *io)
142 io->cb.aio_fildes = store->fd;
143 io->cb.aio_sigevent = store->sigevent;
144 /* cb->aio_sigevent.sigev_value.sival_int = fd; */
147 static void handle_read_write(struct store *store, struct io *io)
150 struct xseg_request *req = io->req;
151 struct aiocb *cb = &io->cb;
153 if (req->state != XS_ACCEPTED) {
155 req->serviced += io->retval;
157 req->datasize = req->serviced;
159 if (req->serviced >= req->datasize) {
166 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
168 if (req->flags & (XSEG_FLUSH | XSEG_FUA)) {
169 /* for now, no FLUSH/FUA support.
170 * note that with FLUSH/size == 0
171 * there will probably be a (uint64_t)-1 offset */
180 prepare_io(store, io);
181 cb->aio_buf = req->data + req->serviced;
182 cb->aio_nbytes = req->datasize - req->serviced;
183 cb->aio_offset = req->offset + req->serviced;
193 snprintf(req->data, req->datasize,
194 "wtf, corrupt op %u?\n", req->op);
200 strerror_r(errno, req->data, req->datasize);
208 static void dispatch(struct store *store, struct io *io)
210 switch (io->req->op) {
213 handle_read_write(store, io); break;
216 handle_unknown(store, io);
220 static void handle_pending(struct store *store, struct io *io)
222 int r = aio_error(&io->cb);
223 if (r == EINPROGRESS) {
228 io->retval = aio_return(&io->cb);
237 static void handle_accepted(struct store *store, struct io *io)
239 struct xseg_request *req = io->req;
241 req->state = XS_ACCEPTED;
246 static int blockd_loop(struct store *store)
248 struct xseg *xseg = store->xseg;
249 uint32_t portno = store->portno;
251 struct xseg_request *accepted;
255 xseg_prepare_wait(xseg, portno);
256 io = alloc_io(store);
258 accepted = xseg_accept(xseg, portno);
260 xseg_cancel_wait(xseg, portno);
262 handle_accepted(store, io);
267 io = get_pending_io(store);
269 xseg_cancel_wait(xseg, portno);
270 handle_pending(store, io);
273 if (!io && !accepted)
274 xseg_wait_signal(xseg, portno, 10000);
280 static struct xseg *join(char *spec)
282 struct xseg_config config;
285 (void)xseg_parse_spec(spec, &config);
286 xseg = xseg_join(config.type, config.name);
290 (void)xseg_create(&config);
291 return xseg_join(config.type, config.name);
294 static int blockd(char *path, unsigned long size, uint32_t nr_ops,
295 char *spec, long portno)
302 store = malloc(sizeof(struct store));
308 store->fd = open(path, O_RDWR);
309 while (store->fd < 0) {
310 if (errno == ENOENT && size)
311 store->fd = open(path, O_RDWR | O_CREAT, 0600);
319 r = fstat(store->fd, &stat);
326 fprintf(stderr, "size cannot be zero\n");
331 lseek(store->fd, size-1, SEEK_SET);
332 if (write(store->fd, &r, 1) != 1) {
343 store->sigevent.sigev_notify = SIGEV_SIGNAL;
344 store->sigevent.sigev_signo = SIGIO;
345 sa.sa_sigaction = sigaction_handler;
346 sa.sa_flags = SA_SIGINFO;
347 if (sigemptyset(&sa.sa_mask))
348 perror("sigemptyset");
350 if (sigaction(SIGIO, &sa, NULL)) {
355 store->nr_ops = nr_ops;
356 store->free_bufs = calloc(nr_ops, sizeof(xqindex));
357 if (!store->free_bufs)
360 store->pending_bufs = calloc(nr_ops, sizeof(xqindex));
361 if (!store->pending_bufs)
364 store->ios = calloc(nr_ops, sizeof(struct io));
371 xq_init_seq(&store->free_ops, nr_ops, nr_ops, store->free_bufs);
372 xq_init_empty(&store->pending_ops, nr_ops, store->pending_bufs);
374 if (xseg_initialize("posix")) {
375 printf("cannot initialize library\n");
378 store->xseg = join(spec);
382 store->xport = xseg_bind_port(store->xseg, portno);
384 printf("cannot bind to port %ld\n", portno);
388 store->portno = xseg_portno(store->xseg, store->xport);
389 printf("blockd on port %u/%u\n",
390 store->portno, store->xseg->config.nr_ports);
392 return blockd_loop(store);
395 int main(int argc, char **argv)
397 char *path, *spec = "";
411 for (i = 2; i < argc; i++) {
412 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
418 if (!strcmp(argv[i], "-s") && i + 1 < argc) {
419 size = strtoul(argv[i+1], NULL, 10);
424 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
425 portno = strtoul(argv[i+1], NULL, 10);
430 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
431 nr_ops = strtoul(argv[i+1], NULL, 10);
440 return blockd(path, size, nr_ops, spec, portno);