Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / filed.c @ 0072a4af

History | View | Annotate | Download (14.1 kB)

1 ff5b1cbf Giannakos Filippos
#define _GNU_SOURCE
2 ff5b1cbf Giannakos Filippos
#include <stdio.h>
3 ff5b1cbf Giannakos Filippos
#include <stdlib.h>
4 ff5b1cbf Giannakos Filippos
#include <sys/types.h>
5 ff5b1cbf Giannakos Filippos
#include <sys/stat.h>
6 ff5b1cbf Giannakos Filippos
#include <unistd.h>
7 ff5b1cbf Giannakos Filippos
#include <string.h>
8 ff5b1cbf Giannakos Filippos
#include <fcntl.h>
9 ff5b1cbf Giannakos Filippos
#include <errno.h>
10 ff5b1cbf Giannakos Filippos
#include <aio.h>
11 ff5b1cbf Giannakos Filippos
#include <signal.h>
12 ff5b1cbf Giannakos Filippos
#include <limits.h>
13 ff5b1cbf Giannakos Filippos
#include <xseg/xseg.h>
14 ff5b1cbf Giannakos Filippos
#include <pthread.h>
15 ff5b1cbf Giannakos Filippos
16 ff5b1cbf Giannakos Filippos
#define MAX_PATH_SIZE 255
17 ff5b1cbf Giannakos Filippos
#define MAX_FILENAME_SIZE 255
18 ff5b1cbf Giannakos Filippos
19 ff5b1cbf Giannakos Filippos
static int usage(void)
20 ff5b1cbf Giannakos Filippos
{
21 ff5b1cbf Giannakos Filippos
        printf("Usage: ./filed <path_to_directory> [options]\n"
22 ff5b1cbf Giannakos Filippos
                "Options: [-p portno]\n"
23 ff5b1cbf Giannakos Filippos
                "         [-g type:name:nr_ports:nr_requests:request_size:extra_size:page_shift]\n"
24 ff5b1cbf Giannakos Filippos
                "         [-n nr_parallel_ops]\n");
25 ff5b1cbf Giannakos Filippos
        return 1;
26 ff5b1cbf Giannakos Filippos
}
27 ff5b1cbf Giannakos Filippos
28 ff5b1cbf Giannakos Filippos
struct fsync_io {
29 ff5b1cbf Giannakos Filippos
        unsigned long cacheidx;
30 ff5b1cbf Giannakos Filippos
        int fd;
31 ff5b1cbf Giannakos Filippos
        uint64_t time;
32 ff5b1cbf Giannakos Filippos
};
33 ff5b1cbf Giannakos Filippos
34 ff5b1cbf Giannakos Filippos
struct io {
35 ff5b1cbf Giannakos Filippos
        struct store *store;
36 ff5b1cbf Giannakos Filippos
        struct xseg_request *req;
37 ff5b1cbf Giannakos Filippos
        ssize_t retval;
38 ff5b1cbf Giannakos Filippos
        long fdcacheidx;
39 0b67b2ab Giannakos Filippos
        pthread_cond_t cond;
40 0b67b2ab Giannakos Filippos
        pthread_mutex_t lock;
41 ff5b1cbf Giannakos Filippos
};
42 ff5b1cbf Giannakos Filippos
43 5eae9d27 Giannakos Filippos
#define READY (1 << 1)
44 ff5b1cbf Giannakos Filippos
45 0072a4af Vangelis Koukis
struct fdcache_node {
46 ff5b1cbf Giannakos Filippos
        volatile int fd;
47 ff5b1cbf Giannakos Filippos
        volatile unsigned int ref;
48 ff5b1cbf Giannakos Filippos
        volatile unsigned long time;
49 ff5b1cbf Giannakos Filippos
        volatile unsigned int flags;
50 ff5b1cbf Giannakos Filippos
        pthread_cond_t cond;
51 0072a4af Vangelis Koukis
        char name[MAX_FILENAME_SIZE + 1];
52 ff5b1cbf Giannakos Filippos
};
53 ff5b1cbf Giannakos Filippos
54 ff5b1cbf Giannakos Filippos
struct store {
55 ff5b1cbf Giannakos Filippos
        struct xseg *xseg;
56 ff5b1cbf Giannakos Filippos
        struct xseg_port *xport;
57 ff5b1cbf Giannakos Filippos
        uint32_t portno;
58 ff5b1cbf Giannakos Filippos
        uint64_t size;
59 ff5b1cbf Giannakos Filippos
        struct io *ios;
60 0b67b2ab Giannakos Filippos
        struct xq free_ops;
61 0b67b2ab Giannakos Filippos
        char *free_bufs;
62 ff5b1cbf Giannakos Filippos
        long nr_ops;
63 ff5b1cbf Giannakos Filippos
        struct sigevent sigevent;
64 ff5b1cbf Giannakos Filippos
        int dirfd;
65 ff5b1cbf Giannakos Filippos
        uint32_t path_len;
66 ff5b1cbf Giannakos Filippos
        uint64_t handled_reqs;
67 ff5b1cbf Giannakos Filippos
        unsigned long maxfds;
68 0072a4af Vangelis Koukis
        struct fdcache_node *fdcache;
69 ff5b1cbf Giannakos Filippos
        pthread_t *iothread;
70 ff5b1cbf Giannakos Filippos
        pthread_mutex_t cache_lock;
71 0b67b2ab Giannakos Filippos
        char path[MAX_PATH_SIZE + 1];
72 ff5b1cbf Giannakos Filippos
};
73 ff5b1cbf Giannakos Filippos
74 ff5b1cbf Giannakos Filippos
static unsigned long sigaction_count;
75 ff5b1cbf Giannakos Filippos
76 ff5b1cbf Giannakos Filippos
static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
77 ff5b1cbf Giannakos Filippos
{
78 0072a4af Vangelis Koukis
        sigaction_count++;
79 ff5b1cbf Giannakos Filippos
}
80 ff5b1cbf Giannakos Filippos
81 ff5b1cbf Giannakos Filippos
static void log_io(char *msg, struct io *io)
82 ff5b1cbf Giannakos Filippos
{
83 ff5b1cbf Giannakos Filippos
        char name[64], data[64];
84 ff5b1cbf Giannakos Filippos
        /* null terminate name in case of req->name is less than 63 characters,
85 ff5b1cbf Giannakos Filippos
         * and next character after name (aka first byte of next buffer) is not
86 ff5b1cbf Giannakos Filippos
         * null
87 ff5b1cbf Giannakos Filippos
         */
88 ff5b1cbf Giannakos Filippos
        unsigned int end = (io->req->namesize > 63) ? 63 : io->req->namesize;
89 ff5b1cbf Giannakos Filippos
        strncpy(name, io->req->name, end);
90 ff5b1cbf Giannakos Filippos
        name[end] = 0;
91 ff5b1cbf Giannakos Filippos
        strncpy(data, io->req->data, 63);
92 ff5b1cbf Giannakos Filippos
        data[63] = 0;
93 0072a4af Vangelis Koukis
#if 0
94 0072a4af Vangelis Koukis
vkoukis debug off
95 0072a4af Vangelis Koukis
        fprintf(stderr,
96 0072a4af Vangelis Koukis
                "%s: fd:%u, op:%u offset: %llu size: %lu retval: %lu, reqstate: %u\n"
97 0072a4af Vangelis Koukis
                "name[%u]: '%s', data[%llu]:\n%s------------------\n\n",
98 ff5b1cbf Giannakos Filippos
                msg,
99 ff5b1cbf Giannakos Filippos
                (unsigned int)io->fdcacheidx, //this is cacheidx not fd
100 ff5b1cbf Giannakos Filippos
                (unsigned int)io->req->op,
101 ff5b1cbf Giannakos Filippos
                (unsigned long long)io->req->offset,
102 ff5b1cbf Giannakos Filippos
                (unsigned long)io->req->size,
103 ff5b1cbf Giannakos Filippos
                (unsigned long)io->retval,
104 ff5b1cbf Giannakos Filippos
                (unsigned int)io->req->state,
105 ff5b1cbf Giannakos Filippos
                (unsigned int)io->req->namesize, name,
106 ff5b1cbf Giannakos Filippos
                (unsigned long long)io->req->datasize, data);
107 0072a4af Vangelis Koukis
#endif
108 ff5b1cbf Giannakos Filippos
}
109 ff5b1cbf Giannakos Filippos
110 0b67b2ab Giannakos Filippos
static struct io *alloc_io(struct store *store)
111 0b67b2ab Giannakos Filippos
{
112 0b67b2ab Giannakos Filippos
        xqindex idx = xq_pop_head(&store->free_ops);
113 0b67b2ab Giannakos Filippos
        if (idx == None)
114 0b67b2ab Giannakos Filippos
                return NULL;
115 0b67b2ab Giannakos Filippos
        return store->ios + idx;
116 0b67b2ab Giannakos Filippos
}
117 0b67b2ab Giannakos Filippos
118 0b67b2ab Giannakos Filippos
static inline void free_io(struct store *store, struct io *io)
119 0b67b2ab Giannakos Filippos
{
120 0b67b2ab Giannakos Filippos
        xqindex idx = io - store->ios;
121 0b67b2ab Giannakos Filippos
        io->req = NULL;
122 0b67b2ab Giannakos Filippos
        xq_append_head(&store->free_ops, idx);
123 0b67b2ab Giannakos Filippos
}
124 0b67b2ab Giannakos Filippos
125 0b67b2ab Giannakos Filippos
126 ff5b1cbf Giannakos Filippos
static void complete(struct store *store, struct io *io)
127 ff5b1cbf Giannakos Filippos
{
128 ff5b1cbf Giannakos Filippos
        struct xseg_request *req = io->req;
129 ff5b1cbf Giannakos Filippos
        req->state |= XS_SERVED;
130 ff5b1cbf Giannakos Filippos
        log_io("complete", io);
131 ff5b1cbf Giannakos Filippos
        xseg_respond(store->xseg, req->portno, req);
132 ff5b1cbf Giannakos Filippos
        xseg_signal(store->xseg, req->portno);
133 5eae9d27 Giannakos Filippos
        __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
134 ff5b1cbf Giannakos Filippos
}
135 ff5b1cbf Giannakos Filippos
136 ff5b1cbf Giannakos Filippos
static void fail(struct store *store, struct io *io)
137 ff5b1cbf Giannakos Filippos
{
138 ff5b1cbf Giannakos Filippos
        struct xseg_request *req = io->req;
139 e0cd4893 Vangelis Koukis
        req->state |= XS_FAILED;
140 ff5b1cbf Giannakos Filippos
        log_io("fail", io);
141 ff5b1cbf Giannakos Filippos
        xseg_respond(store->xseg, req->portno, req);
142 ff5b1cbf Giannakos Filippos
        xseg_signal(store->xseg, req->portno);
143 ff5b1cbf Giannakos Filippos
        if (io->fdcacheidx >= 0) {
144 5eae9d27 Giannakos Filippos
                __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
145 ff5b1cbf Giannakos Filippos
        }
146 ff5b1cbf Giannakos Filippos
}
147 ff5b1cbf Giannakos Filippos
148 ff5b1cbf Giannakos Filippos
static void pending(struct store *store, struct io *io)
149 ff5b1cbf Giannakos Filippos
{
150 ff5b1cbf Giannakos Filippos
        io->req->state = XS_PENDING;
151 ff5b1cbf Giannakos Filippos
}
152 ff5b1cbf Giannakos Filippos
153 ff5b1cbf Giannakos Filippos
static void handle_unknown(struct store *store, struct io *io)
154 ff5b1cbf Giannakos Filippos
{
155 ff5b1cbf Giannakos Filippos
        struct xseg_request *req = io->req;
156 ff5b1cbf Giannakos Filippos
        snprintf(req->data, req->datasize, "unknown request op");
157 ff5b1cbf Giannakos Filippos
        fail(store, io);
158 ff5b1cbf Giannakos Filippos
}
159 ff5b1cbf Giannakos Filippos
160 ff5b1cbf Giannakos Filippos
static inline void prepare_io(struct store *store, struct io *io)
161 ff5b1cbf Giannakos Filippos
{
162 ff5b1cbf Giannakos Filippos
}
163 ff5b1cbf Giannakos Filippos
164 ff5b1cbf Giannakos Filippos
165 0072a4af Vangelis Koukis
static int dir_open(        struct store *store, struct io *io,
166 0072a4af Vangelis Koukis
                        char *name, uint32_t namesize, int mode        )
167 0072a4af Vangelis Koukis
{
168 ff5b1cbf Giannakos Filippos
        int fd = -1, r;
169 0072a4af Vangelis Koukis
        struct fdcache_node *ce = NULL;
170 5eae9d27 Giannakos Filippos
        long i, lru;
171 5eae9d27 Giannakos Filippos
        uint64_t min;
172 ff5b1cbf Giannakos Filippos
        io->fdcacheidx = -1;
173 ff5b1cbf Giannakos Filippos
        if (namesize > MAX_FILENAME_SIZE)
174 ff5b1cbf Giannakos Filippos
                goto out_err;
175 ff5b1cbf Giannakos Filippos
176 ff5b1cbf Giannakos Filippos
start:
177 0b67b2ab Giannakos Filippos
        /* check cache */
178 34609aff Giannakos Filippos
        pthread_mutex_lock(&store->cache_lock);
179 34609aff Giannakos Filippos
start_locked:
180 5eae9d27 Giannakos Filippos
        lru = -1;
181 5eae9d27 Giannakos Filippos
        min = UINT64_MAX;
182 ff5b1cbf Giannakos Filippos
        for (i = 0; i < store->maxfds; i++) {
183 ff5b1cbf Giannakos Filippos
                if (store->fdcache[i].ref == 0 && min > store->fdcache[i].time 
184 0072a4af Vangelis Koukis
                                && (store->fdcache[i].flags & READY)) {
185 ff5b1cbf Giannakos Filippos
                        min = store->fdcache[i].time;
186 ff5b1cbf Giannakos Filippos
                        lru = i;
187 ff5b1cbf Giannakos Filippos
188 ff5b1cbf Giannakos Filippos
                }
189 0072a4af Vangelis Koukis
                if (!strncmp(store->fdcache[i].name, name, namesize)) {
190 0072a4af Vangelis Koukis
                        if (store->fdcache[i].name[namesize] == 0) {
191 0072a4af Vangelis Koukis
                                ce = &store->fdcache[i];
192 0b67b2ab Giannakos Filippos
                                /* if any other io thread is currently opening
193 0b67b2ab Giannakos Filippos
                                 * the file, block until it succeeds or fails
194 0b67b2ab Giannakos Filippos
                                 */
195 0072a4af Vangelis Koukis
                                if (!(ce->flags & READY)) {
196 0072a4af Vangelis Koukis
                                        pthread_cond_wait(&ce->cond, &store->cache_lock);
197 34609aff Giannakos Filippos
                                        /* when ready, restart lookup */
198 34609aff Giannakos Filippos
                                        goto start_locked;
199 ff5b1cbf Giannakos Filippos
                                }
200 0b67b2ab Giannakos Filippos
                                /* if successfully opened */
201 0072a4af Vangelis Koukis
                                if (ce->fd > 0) {
202 ff5b1cbf Giannakos Filippos
                                        fd = store->fdcache[i].fd;
203 ff5b1cbf Giannakos Filippos
                                        io->fdcacheidx = i;
204 ff5b1cbf Giannakos Filippos
                                        goto out;
205 ff5b1cbf Giannakos Filippos
                                }
206 0b67b2ab Giannakos Filippos
                                /* else open failed for the other io thread, so
207 0b67b2ab Giannakos Filippos
                                 * it should fail for everyone waiting on this
208 0b67b2ab Giannakos Filippos
                                 * file.
209 0b67b2ab Giannakos Filippos
                                 */
210 ff5b1cbf Giannakos Filippos
                                else {
211 ff5b1cbf Giannakos Filippos
                                        fd = -1;
212 ff5b1cbf Giannakos Filippos
                                        io->fdcacheidx = -1;
213 ff5b1cbf Giannakos Filippos
                                        goto out_err_unlock;
214 ff5b1cbf Giannakos Filippos
                                }
215 ff5b1cbf Giannakos Filippos
                        }
216 ff5b1cbf Giannakos Filippos
                }
217 ff5b1cbf Giannakos Filippos
        }
218 0b67b2ab Giannakos Filippos
        if (lru < 0){
219 0b67b2ab Giannakos Filippos
                /* all cache entries are currently being used */
220 ff5b1cbf Giannakos Filippos
                pthread_mutex_unlock(&store->cache_lock);
221 ff5b1cbf Giannakos Filippos
                goto start;
222 ff5b1cbf Giannakos Filippos
        }
223 ff5b1cbf Giannakos Filippos
        if (store->fdcache[lru].ref){
224 ff5b1cbf Giannakos Filippos
                fd = -1;
225 0072a4af Vangelis Koukis
                printf("lru(%ld) ref not 0 (%u)\n", lru, store->fdcache[lru].ref);
226 ff5b1cbf Giannakos Filippos
                goto out_err_unlock;
227 ff5b1cbf Giannakos Filippos
        }
228 0b67b2ab Giannakos Filippos
        /* make room for new file */
229 0072a4af Vangelis Koukis
        ce = &store->fdcache[lru];
230 0b67b2ab Giannakos Filippos
        /* set name here and state to not ready, for any other requests on the
231 0b67b2ab Giannakos Filippos
         * same target that may follow
232 0b67b2ab Giannakos Filippos
         */
233 0072a4af Vangelis Koukis
        strncpy(ce->name, name, namesize);
234 0072a4af Vangelis Koukis
        ce->name[namesize] = 0;
235 0072a4af Vangelis Koukis
        ce->flags &= ~READY;
236 ff5b1cbf Giannakos Filippos
        pthread_mutex_unlock(&store->cache_lock);
237 ff5b1cbf Giannakos Filippos
238 0072a4af Vangelis Koukis
        if (ce->fd >0){
239 0072a4af Vangelis Koukis
                if (close(ce->fd) < 0){
240 0b67b2ab Giannakos Filippos
                        perror("close");
241 ff5b1cbf Giannakos Filippos
                }
242 ff5b1cbf Giannakos Filippos
        }
243 0072a4af Vangelis Koukis
        fd = openat(store->dirfd, ce->name, O_RDWR);        
244 ff5b1cbf Giannakos Filippos
        if (fd < 0) {
245 ff5b1cbf Giannakos Filippos
                if (errno == ENOENT){
246 0072a4af Vangelis Koukis
                        fd = openat(store->dirfd, ce->name, 
247 ff5b1cbf Giannakos Filippos
                                        O_RDWR | O_CREAT, 0600);
248 ff5b1cbf Giannakos Filippos
                        if (fd >= 0)
249 ff5b1cbf Giannakos Filippos
                                goto new_entry;
250 ff5b1cbf Giannakos Filippos
                }
251 ff5b1cbf Giannakos Filippos
                perror(store->path);
252 0b67b2ab Giannakos Filippos
                /* insert in cache a negative fd to indicate opening error to
253 0b67b2ab Giannakos Filippos
                 * any other ios waiting for the file to open
254 0b67b2ab Giannakos Filippos
                 */
255 ff5b1cbf Giannakos Filippos
        }        
256 0b67b2ab Giannakos Filippos
        /* insert in cache */
257 ff5b1cbf Giannakos Filippos
new_entry:
258 ff5b1cbf Giannakos Filippos
        pthread_mutex_lock(&store->cache_lock);
259 0072a4af Vangelis Koukis
        ce->fd = fd;
260 0072a4af Vangelis Koukis
        ce->ref = 0;
261 0072a4af Vangelis Koukis
        ce->flags = READY;
262 0072a4af Vangelis Koukis
        pthread_cond_broadcast(&ce->cond);
263 ff5b1cbf Giannakos Filippos
        if (fd > 0) {
264 ff5b1cbf Giannakos Filippos
                io->fdcacheidx = lru;
265 ff5b1cbf Giannakos Filippos
        }
266 ff5b1cbf Giannakos Filippos
        else {
267 ff5b1cbf Giannakos Filippos
                io->fdcacheidx = -1;
268 ff5b1cbf Giannakos Filippos
                goto out_err_unlock;
269 ff5b1cbf Giannakos Filippos
        }
270 ff5b1cbf Giannakos Filippos
271 ff5b1cbf Giannakos Filippos
out:
272 ff5b1cbf Giannakos Filippos
        store->handled_reqs++;
273 0072a4af Vangelis Koukis
        ce->time = store->handled_reqs;
274 0072a4af Vangelis Koukis
        __sync_fetch_and_add(&ce->ref, 1);
275 ff5b1cbf Giannakos Filippos
        pthread_mutex_unlock(&store->cache_lock);
276 ff5b1cbf Giannakos Filippos
out_err:
277 ff5b1cbf Giannakos Filippos
        return fd;
278 ff5b1cbf Giannakos Filippos
279 ff5b1cbf Giannakos Filippos
out_err_unlock:
280 ff5b1cbf Giannakos Filippos
        pthread_mutex_unlock(&store->cache_lock);
281 ff5b1cbf Giannakos Filippos
        goto out_err;
282 ff5b1cbf Giannakos Filippos
}
283 ff5b1cbf Giannakos Filippos
284 ff5b1cbf Giannakos Filippos
static void handle_read_write(struct store *store, struct io *io)
285 ff5b1cbf Giannakos Filippos
{
286 ff5b1cbf Giannakos Filippos
        int r, fd, mode;
287 ff5b1cbf Giannakos Filippos
        struct xseg_request *req = io->req;
288 ff5b1cbf Giannakos Filippos
289 ff5b1cbf Giannakos Filippos
        if (req->op == X_WRITE)
290 ff5b1cbf Giannakos Filippos
                mode = 1;
291 ff5b1cbf Giannakos Filippos
        else
292 ff5b1cbf Giannakos Filippos
                mode = 0;
293 ff5b1cbf Giannakos Filippos
        fd = dir_open(store, io, req->name, req->namesize, mode);
294 ff5b1cbf Giannakos Filippos
        if (fd < 0){
295 ff5b1cbf Giannakos Filippos
                perror("dir_open");
296 ff5b1cbf Giannakos Filippos
                fail(store, io);
297 ff5b1cbf Giannakos Filippos
                return;
298 ff5b1cbf Giannakos Filippos
        }
299 ff5b1cbf Giannakos Filippos
300 ff5b1cbf Giannakos Filippos
        if (req != io->req)
301 ff5b1cbf Giannakos Filippos
                printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
302 ff5b1cbf Giannakos Filippos
        if (!req->size) {
303 ff5b1cbf Giannakos Filippos
                if (req->flags & (XF_FLUSH | XF_FUA)) {
304 0b67b2ab Giannakos Filippos
                        /* No FLUSH/FUA support yet (O_SYNC ?).
305 ff5b1cbf Giannakos Filippos
                         * note that with FLUSH/size == 0 
306 ff5b1cbf Giannakos Filippos
                         * there will probably be a (uint64_t)-1 offset */
307 0b67b2ab Giannakos Filippos
                        complete(store, io);
308 ff5b1cbf Giannakos Filippos
                        return;
309 ff5b1cbf Giannakos Filippos
                } else {
310 ff5b1cbf Giannakos Filippos
                        complete(store, io);
311 ff5b1cbf Giannakos Filippos
                        return;
312 ff5b1cbf Giannakos Filippos
                }
313 ff5b1cbf Giannakos Filippos
        }
314 ff5b1cbf Giannakos Filippos
315 ff5b1cbf Giannakos Filippos
316 ff5b1cbf Giannakos Filippos
        prepare_io(store, io);
317 ff5b1cbf Giannakos Filippos
318 ff5b1cbf Giannakos Filippos
        switch (req->op) {
319 ff5b1cbf Giannakos Filippos
        case X_READ:
320 ff5b1cbf Giannakos Filippos
                while (req->serviced < req->datasize) {
321 ff5b1cbf Giannakos Filippos
                        r = pread(fd, req->data + req->serviced, 
322 ff5b1cbf Giannakos Filippos
                                        req->datasize - req->serviced,
323 ff5b1cbf Giannakos Filippos
                                               req->offset + req->serviced);
324 ff5b1cbf Giannakos Filippos
                        if (r < 0) {
325 ff5b1cbf Giannakos Filippos
                                req->datasize = req->serviced;
326 ff5b1cbf Giannakos Filippos
                                perror("pread");
327 ff5b1cbf Giannakos Filippos
                        }
328 ff5b1cbf Giannakos Filippos
                        else if (r == 0) {
329 ff5b1cbf Giannakos Filippos
                                /* reached end of file. zero out the rest data buffer */
330 ff5b1cbf Giannakos Filippos
                                memset(req->data + req->serviced, 0, req->datasize - req->serviced);
331 ff5b1cbf Giannakos Filippos
                                req->serviced = req->datasize;
332 ff5b1cbf Giannakos Filippos
                        }
333 ff5b1cbf Giannakos Filippos
                        else {
334 ff5b1cbf Giannakos Filippos
                                req->serviced += r;
335 ff5b1cbf Giannakos Filippos
                        }
336 ff5b1cbf Giannakos Filippos
                }
337 ff5b1cbf Giannakos Filippos
                break;
338 ff5b1cbf Giannakos Filippos
        case X_WRITE:
339 ff5b1cbf Giannakos Filippos
                while (req->serviced < req->datasize) {
340 ff5b1cbf Giannakos Filippos
                        r = pwrite(fd, req->data + req->serviced, 
341 ff5b1cbf Giannakos Filippos
                                        req->datasize - req->serviced,
342 ff5b1cbf Giannakos Filippos
                                               req->offset + req->serviced);
343 ff5b1cbf Giannakos Filippos
                        if (r < 0) {
344 ff5b1cbf Giannakos Filippos
                                req->datasize = req->serviced;
345 ff5b1cbf Giannakos Filippos
                        }
346 ff5b1cbf Giannakos Filippos
                        else if (r == 0) {
347 ff5b1cbf Giannakos Filippos
                                /* reached end of file. zero out the rest data buffer */
348 ff5b1cbf Giannakos Filippos
                                memset(req->data + req->serviced, 0, req->datasize - req->serviced);
349 ff5b1cbf Giannakos Filippos
                                req->serviced = req->datasize;
350 ff5b1cbf Giannakos Filippos
                        }
351 ff5b1cbf Giannakos Filippos
                        else {
352 ff5b1cbf Giannakos Filippos
                                req->serviced += r;
353 ff5b1cbf Giannakos Filippos
                        }
354 ff5b1cbf Giannakos Filippos
                }
355 0b67b2ab Giannakos Filippos
                r = fsync(fd);
356 0b67b2ab Giannakos Filippos
                if (r< 0) {
357 0b67b2ab Giannakos Filippos
                        perror("fsync");
358 0b67b2ab Giannakos Filippos
                        /* if fsync fails, then no bytes serviced correctly */
359 0b67b2ab Giannakos Filippos
                        req->serviced = 0;
360 0b67b2ab Giannakos Filippos
                }
361 ff5b1cbf Giannakos Filippos
                break;
362 ff5b1cbf Giannakos Filippos
        default:
363 ff5b1cbf Giannakos Filippos
                snprintf(req->data, req->datasize,
364 ff5b1cbf Giannakos Filippos
                         "wtf, corrupt op %u?\n", req->op);
365 ff5b1cbf Giannakos Filippos
                fail(store, io);
366 ff5b1cbf Giannakos Filippos
                return;
367 ff5b1cbf Giannakos Filippos
        }
368 ff5b1cbf Giannakos Filippos
369 ff5b1cbf Giannakos Filippos
        if (req->serviced > 0 ) {
370 ff5b1cbf Giannakos Filippos
                complete(store, io);
371 ff5b1cbf Giannakos Filippos
        }
372 ff5b1cbf Giannakos Filippos
        else {
373 ff5b1cbf Giannakos Filippos
                strerror_r(errno, req->data, req->datasize);
374 ff5b1cbf Giannakos Filippos
                fail(store, io);
375 ff5b1cbf Giannakos Filippos
        }
376 ff5b1cbf Giannakos Filippos
        return;
377 ff5b1cbf Giannakos Filippos
}
378 ff5b1cbf Giannakos Filippos
379 bea63568 Giannakos Filippos
static void handle_info(struct store *store, struct io *io)
380 bea63568 Giannakos Filippos
{
381 bea63568 Giannakos Filippos
        struct xseg_request *req = io->req;
382 bea63568 Giannakos Filippos
        struct stat stat;
383 bea63568 Giannakos Filippos
        int fd, r;
384 bea63568 Giannakos Filippos
        off_t size;
385 bea63568 Giannakos Filippos
386 5eae9d27 Giannakos Filippos
        fd = dir_open(store, io, req->name, req->namesize, 0);
387 bea63568 Giannakos Filippos
        if (fd < 0) {
388 bea63568 Giannakos Filippos
                fail(store, io);
389 bea63568 Giannakos Filippos
                return;
390 bea63568 Giannakos Filippos
        }
391 bea63568 Giannakos Filippos
        r = fstat(fd, &stat);
392 bea63568 Giannakos Filippos
        if (r < 0) {
393 bea63568 Giannakos Filippos
                perror("fstat");
394 bea63568 Giannakos Filippos
                fail(store, io);
395 bea63568 Giannakos Filippos
                return;
396 bea63568 Giannakos Filippos
        }
397 bea63568 Giannakos Filippos
        size = stat.st_size;
398 bea63568 Giannakos Filippos
        *((off_t *) req->data) = size;
399 bea63568 Giannakos Filippos
        req->datasize = sizeof(size);
400 bea63568 Giannakos Filippos
401 bea63568 Giannakos Filippos
        complete(store, io);
402 bea63568 Giannakos Filippos
}
403 bea63568 Giannakos Filippos
404 ff5b1cbf Giannakos Filippos
static void dispatch(struct store *store, struct io *io)
405 ff5b1cbf Giannakos Filippos
{
406 0072a4af Vangelis Koukis
        printf("io: 0x%p, req: 0x%p, op %u\n",
407 0072a4af Vangelis Koukis
                (void *)io, (void *)io->req, io->req->op);
408 ff5b1cbf Giannakos Filippos
        switch (io->req->op) {
409 ff5b1cbf Giannakos Filippos
        case X_READ:
410 ff5b1cbf Giannakos Filippos
        case X_WRITE:
411 ff5b1cbf Giannakos Filippos
                handle_read_write(store, io); break;
412 bea63568 Giannakos Filippos
        case X_INFO:
413 bea63568 Giannakos Filippos
                handle_info(store, io); break;
414 ff5b1cbf Giannakos Filippos
        case X_SYNC:
415 ff5b1cbf Giannakos Filippos
        default:
416 ff5b1cbf Giannakos Filippos
                handle_unknown(store, io);
417 ff5b1cbf Giannakos Filippos
        }
418 ff5b1cbf Giannakos Filippos
}
419 ff5b1cbf Giannakos Filippos
420 ff5b1cbf Giannakos Filippos
static void handle_accepted(struct store *store, struct io *io)
421 ff5b1cbf Giannakos Filippos
{
422 ff5b1cbf Giannakos Filippos
        struct xseg_request *req = io->req;
423 ff5b1cbf Giannakos Filippos
        req->serviced = 0;
424 ff5b1cbf Giannakos Filippos
        req->state = XS_ACCEPTED;
425 ff5b1cbf Giannakos Filippos
        io->retval = 0;
426 ff5b1cbf Giannakos Filippos
        dispatch(store, io);
427 ff5b1cbf Giannakos Filippos
}
428 ff5b1cbf Giannakos Filippos
429 0b67b2ab Giannakos Filippos
static struct io* wake_up_next_iothread(struct store *store)
430 0b67b2ab Giannakos Filippos
{
431 0b67b2ab Giannakos Filippos
        struct io *io = alloc_io(store);
432 0b67b2ab Giannakos Filippos
        if (io){        
433 0b67b2ab Giannakos Filippos
                pthread_cond_signal(&io->cond);
434 0b67b2ab Giannakos Filippos
        }
435 0b67b2ab Giannakos Filippos
        return io;
436 0b67b2ab Giannakos Filippos
}
437 0b67b2ab Giannakos Filippos
438 ff5b1cbf Giannakos Filippos
void *io_loop(void *arg)
439 ff5b1cbf Giannakos Filippos
{
440 ff5b1cbf Giannakos Filippos
        struct io *io = (struct io *) arg;
441 ff5b1cbf Giannakos Filippos
        struct store *store = io->store;
442 ff5b1cbf Giannakos Filippos
        struct xseg *xseg = store->xseg;
443 ff5b1cbf Giannakos Filippos
        uint32_t portno = store->portno;
444 ff5b1cbf Giannakos Filippos
        struct xseg_request *accepted;
445 ff5b1cbf Giannakos Filippos
446 ff5b1cbf Giannakos Filippos
        for (;;) {
447 ff5b1cbf Giannakos Filippos
                accepted = NULL;
448 ff5b1cbf Giannakos Filippos
                accepted = xseg_accept(xseg, portno);
449 ff5b1cbf Giannakos Filippos
                if (accepted) {
450 ff5b1cbf Giannakos Filippos
                        io->req = accepted;
451 0b67b2ab Giannakos Filippos
                        wake_up_next_iothread(store);
452 ff5b1cbf Giannakos Filippos
                        handle_accepted(store, io);
453 ff5b1cbf Giannakos Filippos
                }
454 0b67b2ab Giannakos Filippos
                else {
455 0b67b2ab Giannakos Filippos
                        free_io(store, io);
456 0b67b2ab Giannakos Filippos
                        pthread_mutex_lock(&io->lock);
457 0b67b2ab Giannakos Filippos
                        pthread_cond_wait(&io->cond, &io->lock);
458 0b67b2ab Giannakos Filippos
                        pthread_mutex_unlock(&io->lock);
459 ff5b1cbf Giannakos Filippos
                }
460 ff5b1cbf Giannakos Filippos
        }
461 ff5b1cbf Giannakos Filippos
462 ff5b1cbf Giannakos Filippos
        return NULL;
463 ff5b1cbf Giannakos Filippos
}
464 ff5b1cbf Giannakos Filippos
465 ff5b1cbf Giannakos Filippos
static struct xseg *join(char *spec)
466 ff5b1cbf Giannakos Filippos
{
467 ff5b1cbf Giannakos Filippos
        struct xseg_config config;
468 ff5b1cbf Giannakos Filippos
        struct xseg *xseg;
469 ff5b1cbf Giannakos Filippos
470 ff5b1cbf Giannakos Filippos
        (void)xseg_parse_spec(spec, &config);
471 ff5b1cbf Giannakos Filippos
        xseg = xseg_join(config.type, config.name);
472 ff5b1cbf Giannakos Filippos
        if (xseg)
473 ff5b1cbf Giannakos Filippos
                return xseg;
474 ff5b1cbf Giannakos Filippos
475 0072a4af Vangelis Koukis
        fprintf(stderr, "Failed to join xseg, creating it...\n");
476 ff5b1cbf Giannakos Filippos
        (void)xseg_create(&config);
477 ff5b1cbf Giannakos Filippos
        return xseg_join(config.type, config.name);
478 ff5b1cbf Giannakos Filippos
}
479 ff5b1cbf Giannakos Filippos
480 0b67b2ab Giannakos Filippos
static int filed_loop(struct store *store)
481 0b67b2ab Giannakos Filippos
{
482 0b67b2ab Giannakos Filippos
        struct xseg *xseg = store->xseg;
483 0b67b2ab Giannakos Filippos
        uint32_t portno = store->portno;
484 0b67b2ab Giannakos Filippos
        struct io *io;
485 0b67b2ab Giannakos Filippos
486 0b67b2ab Giannakos Filippos
        for (;;) {
487 0b67b2ab Giannakos Filippos
                io = wake_up_next_iothread(store);
488 0b67b2ab Giannakos Filippos
                xseg_prepare_wait(xseg, portno);
489 0b67b2ab Giannakos Filippos
                xseg_wait_signal(xseg, portno, 10000);
490 0b67b2ab Giannakos Filippos
        }
491 0b67b2ab Giannakos Filippos
        return 0;
492 0b67b2ab Giannakos Filippos
}
493 0b67b2ab Giannakos Filippos
494 0072a4af Vangelis Koukis
static int filed(        char *path, unsigned long size, uint32_t nr_ops,
495 0072a4af Vangelis Koukis
                        char *spec, long portno        )
496 ff5b1cbf Giannakos Filippos
{
497 ff5b1cbf Giannakos Filippos
        struct stat stat;
498 ff5b1cbf Giannakos Filippos
        struct sigaction sa;
499 ff5b1cbf Giannakos Filippos
        struct store *store;
500 ff5b1cbf Giannakos Filippos
        int r, mode, i;
501 0b67b2ab Giannakos Filippos
        void *status;
502 ff5b1cbf Giannakos Filippos
503 ff5b1cbf Giannakos Filippos
        store = malloc(sizeof(struct store));
504 ff5b1cbf Giannakos Filippos
        if (!store) {
505 ff5b1cbf Giannakos Filippos
                perror("malloc");
506 ff5b1cbf Giannakos Filippos
                return -1;
507 ff5b1cbf Giannakos Filippos
        }
508 ff5b1cbf Giannakos Filippos
509 ff5b1cbf Giannakos Filippos
510 ff5b1cbf Giannakos Filippos
        /*
511 ff5b1cbf Giannakos Filippos
        r = daemon(1, 1);
512 ff5b1cbf Giannakos Filippos
        if (r < 0)
513 ff5b1cbf Giannakos Filippos
                return r;
514 ff5b1cbf Giannakos Filippos
                */
515 ff5b1cbf Giannakos Filippos
516 ff5b1cbf Giannakos Filippos
        store->sigevent.sigev_notify = SIGEV_SIGNAL;
517 ff5b1cbf Giannakos Filippos
        store->sigevent.sigev_signo = SIGIO;
518 ff5b1cbf Giannakos Filippos
        sa.sa_sigaction = sigaction_handler;
519 ff5b1cbf Giannakos Filippos
        sa.sa_flags = SA_SIGINFO;
520 ff5b1cbf Giannakos Filippos
        if (sigemptyset(&sa.sa_mask))
521 ff5b1cbf Giannakos Filippos
                perror("sigemptyset");
522 ff5b1cbf Giannakos Filippos
523 ff5b1cbf Giannakos Filippos
        if (sigaction(SIGIO, &sa, NULL)) {
524 ff5b1cbf Giannakos Filippos
                perror("sigaction");
525 ff5b1cbf Giannakos Filippos
                return -1;
526 ff5b1cbf Giannakos Filippos
        }
527 ff5b1cbf Giannakos Filippos
528 ff5b1cbf Giannakos Filippos
        store->nr_ops = nr_ops;
529 ff5b1cbf Giannakos Filippos
        store->maxfds = 2 * nr_ops;
530 ff5b1cbf Giannakos Filippos
531 0072a4af Vangelis Koukis
        store->fdcache = calloc(store->maxfds, sizeof(struct fdcache_node));
532 ff5b1cbf Giannakos Filippos
        if(!store->fdcache)
533 ff5b1cbf Giannakos Filippos
                goto malloc_fail;
534 ff5b1cbf Giannakos Filippos
535 0b67b2ab Giannakos Filippos
        store->free_bufs = calloc(store->nr_ops, sizeof(xqindex));
536 0b67b2ab Giannakos Filippos
        if(!store->free_bufs)
537 ff5b1cbf Giannakos Filippos
                goto malloc_fail;
538 ff5b1cbf Giannakos Filippos
539 ff5b1cbf Giannakos Filippos
        store->iothread = calloc(store->nr_ops, sizeof(pthread_t));
540 ff5b1cbf Giannakos Filippos
        if(!store->iothread)
541 ff5b1cbf Giannakos Filippos
                goto malloc_fail;
542 ff5b1cbf Giannakos Filippos
543 ff5b1cbf Giannakos Filippos
        store->ios = calloc(nr_ops, sizeof(struct io));
544 ff5b1cbf Giannakos Filippos
        if (!store->ios) {
545 ff5b1cbf Giannakos Filippos
malloc_fail:
546 ff5b1cbf Giannakos Filippos
                perror("malloc");
547 ff5b1cbf Giannakos Filippos
                return -1;
548 ff5b1cbf Giannakos Filippos
        }
549 ff5b1cbf Giannakos Filippos
550 ff5b1cbf Giannakos Filippos
        for (i = 0; i < nr_ops; i++) {
551 ff5b1cbf Giannakos Filippos
                store->ios[i].store = store;
552 0b67b2ab Giannakos Filippos
                pthread_cond_init(&store->ios[i].cond, NULL);
553 0b67b2ab Giannakos Filippos
                pthread_mutex_init(&store->ios[i].lock, NULL);
554 ff5b1cbf Giannakos Filippos
        }
555 ff5b1cbf Giannakos Filippos
556 0b67b2ab Giannakos Filippos
        xq_init_seq(&store->free_ops, store->nr_ops, store->nr_ops, store->free_bufs);
557 ff5b1cbf Giannakos Filippos
558 ff5b1cbf Giannakos Filippos
        store->handled_reqs = 0;
559 ff5b1cbf Giannakos Filippos
        strncpy(store->path, path, MAX_PATH_SIZE);
560 ff5b1cbf Giannakos Filippos
        store->path[MAX_PATH_SIZE] = 0;
561 ff5b1cbf Giannakos Filippos
562 ff5b1cbf Giannakos Filippos
        store->path_len = strlen(store->path);
563 ff5b1cbf Giannakos Filippos
        if (store->path[store->path_len -1] != '/'){
564 ff5b1cbf Giannakos Filippos
                store->path[store->path_len] = '/';
565 ff5b1cbf Giannakos Filippos
                store->path[++store->path_len]= 0;
566 ff5b1cbf Giannakos Filippos
        }
567 ff5b1cbf Giannakos Filippos
        store->dirfd = open(store->path, O_RDWR);
568 ff5b1cbf Giannakos Filippos
        if (!(store->dirfd < 0 && errno == EISDIR)){
569 ff5b1cbf Giannakos Filippos
                fprintf(stderr, "%s is not a directory\n", store->path);
570 ff5b1cbf Giannakos Filippos
                return -1;
571 ff5b1cbf Giannakos Filippos
        }
572 ff5b1cbf Giannakos Filippos
573 ff5b1cbf Giannakos Filippos
        store->dirfd = open(store->path, O_RDONLY);
574 ff5b1cbf Giannakos Filippos
        if (store->dirfd < 0){
575 ff5b1cbf Giannakos Filippos
                perror("Directory open");
576 ff5b1cbf Giannakos Filippos
                return -1;
577 ff5b1cbf Giannakos Filippos
        }
578 ff5b1cbf Giannakos Filippos
/*
579 ff5b1cbf Giannakos Filippos
        mode = 1;
580 ff5b1cbf Giannakos Filippos
        int fd = dir_open(store, ".__tmp", 6, 1);
581 ff5b1cbf Giannakos Filippos
        if (fd < 0){
582 ff5b1cbf Giannakos Filippos
                perror("Directory check");
583 ff5b1cbf Giannakos Filippos
                return -1;
584 ff5b1cbf Giannakos Filippos
        }
585 ff5b1cbf Giannakos Filippos
*/
586 ff5b1cbf Giannakos Filippos
        if (xseg_initialize("posix")) {
587 ff5b1cbf Giannakos Filippos
                printf("cannot initialize library\n");
588 ff5b1cbf Giannakos Filippos
                return -1;
589 ff5b1cbf Giannakos Filippos
        }
590 ff5b1cbf Giannakos Filippos
        store->xseg = join(spec);
591 ff5b1cbf Giannakos Filippos
        if (!store->xseg)
592 ff5b1cbf Giannakos Filippos
                return -1;
593 ff5b1cbf Giannakos Filippos
594 ff5b1cbf Giannakos Filippos
        store->xport = xseg_bind_port(store->xseg, portno);
595 ff5b1cbf Giannakos Filippos
        if (!store->xport) {
596 ff5b1cbf Giannakos Filippos
                printf("cannot bind to port %ld\n", portno);
597 ff5b1cbf Giannakos Filippos
                return -1;
598 ff5b1cbf Giannakos Filippos
        }
599 ff5b1cbf Giannakos Filippos
600 ff5b1cbf Giannakos Filippos
        store->portno = xseg_portno(store->xseg, store->xport);
601 0072a4af Vangelis Koukis
        printf("filed on port %u/%u\n",
602 ff5b1cbf Giannakos Filippos
                store->portno, store->xseg->config.nr_ports);
603 ff5b1cbf Giannakos Filippos
604 ff5b1cbf Giannakos Filippos
        for (i = 0; i < nr_ops; i++) {
605 ff5b1cbf Giannakos Filippos
                pthread_cond_init(&store->fdcache[i].cond, NULL);
606 ff5b1cbf Giannakos Filippos
                store->fdcache[i].flags = READY;
607 ff5b1cbf Giannakos Filippos
        }
608 ff5b1cbf Giannakos Filippos
        for (i = 0; i < nr_ops; i++) {
609 ff5b1cbf Giannakos Filippos
                //TODO error check + cond variable to stop io from starting
610 ff5b1cbf Giannakos Filippos
                //unless all threads are created successfully
611 ff5b1cbf Giannakos Filippos
                pthread_create(store->iothread + i, NULL, io_loop, (void *) (store->ios + i));
612 ff5b1cbf Giannakos Filippos
        }
613 ff5b1cbf Giannakos Filippos
        pthread_mutex_init(&store->cache_lock, NULL);
614 0b67b2ab Giannakos Filippos
        return filed_loop(store);
615 ff5b1cbf Giannakos Filippos
}
616 ff5b1cbf Giannakos Filippos
617 ff5b1cbf Giannakos Filippos
int main(int argc, char **argv)
618 ff5b1cbf Giannakos Filippos
{
619 ff5b1cbf Giannakos Filippos
        char *path, *spec = "";
620 ff5b1cbf Giannakos Filippos
        unsigned long size;
621 ff5b1cbf Giannakos Filippos
        int i;
622 ff5b1cbf Giannakos Filippos
        long portno;
623 ff5b1cbf Giannakos Filippos
        uint32_t nr_ops;
624 ff5b1cbf Giannakos Filippos
625 ff5b1cbf Giannakos Filippos
        if (argc < 2)
626 ff5b1cbf Giannakos Filippos
                return usage();
627 ff5b1cbf Giannakos Filippos
628 ff5b1cbf Giannakos Filippos
        path = argv[1];
629 ff5b1cbf Giannakos Filippos
        size = 0;
630 ff5b1cbf Giannakos Filippos
        portno = -1;
631 ff5b1cbf Giannakos Filippos
        nr_ops = 0;
632 ff5b1cbf Giannakos Filippos
633 ff5b1cbf Giannakos Filippos
        for (i = 2; i < argc; i++) {
634 ff5b1cbf Giannakos Filippos
                if (!strcmp(argv[i], "-g") && i + 1 < argc) {
635 ff5b1cbf Giannakos Filippos
                        spec = argv[i+1];
636 ff5b1cbf Giannakos Filippos
                        i += 1;
637 ff5b1cbf Giannakos Filippos
                        continue;
638 ff5b1cbf Giannakos Filippos
                }
639 ff5b1cbf Giannakos Filippos
640 ff5b1cbf Giannakos Filippos
                if (!strcmp(argv[i], "-p") && i + 1 < argc) {
641 ff5b1cbf Giannakos Filippos
                        portno = strtoul(argv[i+1], NULL, 10);
642 ff5b1cbf Giannakos Filippos
                        i += 1;
643 ff5b1cbf Giannakos Filippos
                        continue;
644 ff5b1cbf Giannakos Filippos
                }
645 ff5b1cbf Giannakos Filippos
646 ff5b1cbf Giannakos Filippos
                if (!strcmp(argv[i], "-n") && i + 1 < argc) {
647 ff5b1cbf Giannakos Filippos
                        nr_ops = strtoul(argv[i+1], NULL, 10);
648 ff5b1cbf Giannakos Filippos
                        i += 1;
649 ff5b1cbf Giannakos Filippos
                        continue;
650 ff5b1cbf Giannakos Filippos
                }
651 ff5b1cbf Giannakos Filippos
        }
652 ff5b1cbf Giannakos Filippos
653 ff5b1cbf Giannakos Filippos
        if (nr_ops <= 0)
654 ff5b1cbf Giannakos Filippos
                nr_ops = 16;
655 ff5b1cbf Giannakos Filippos
656 0072a4af Vangelis Koukis
        return filed(path, size, nr_ops, spec, portno);
657 ff5b1cbf Giannakos Filippos
}