Add timer implementation
[archipelago] / xseg / peers / user / bench-xseg.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <unistd.h>
39 #include <sys/syscall.h>
40 #include <sys/types.h>
41 #include <pthread.h>
42 #include <xseg/xseg.h>
43 #include <peer.h>
44 #include <time.h>
45 #include <sys/util.h>
46 #include <signal.h>
47 #include <bench-xseg.h>
48
49 struct timespec delay = {0, 4000000};
50
51 void custom_peer_usage()
52 {
53         fprintf(stderr, "Custom peer options: \n"
54                 "  --------------------------------------------\n"
55                 "    -ts       | None    | Total I/O size\n"
56                 "    -os       | 4M      | Object size\n"
57                 "    -bs       | 4k      | Block size\n"
58                 "    -dp       | None    | Destination port\n"
59                 "    --iodepth | 1       | Number of in-flight I/O requests\n"
60                 "\n");
61 }
62
63 /*
64  * Convert string to size in bytes.
65  * If syntax is invalid, return 0. Values such as zero and non-integer
66  * multiples of segment's page size should not be accepted.
67  */
68 static uint64_t str2num(char *str)
69 {
70         char *unit;
71         uint64_t num;
72
73         num = strtoll(str, &unit, 10);
74         if (strlen(unit) > 1) //Invalid syntax
75                 return 0;
76         else if (strlen(unit) < 1) //Plain number in bytes
77                 return num;
78
79         switch (*unit) {
80                 case 'g':
81                 case 'G':
82                         num *= 1024;
83                 case 'm':
84                 case 'M':
85                         num *= 1024;
86                 case 'k':
87                 case 'K':
88                         num *= 1024;
89                         break;
90                 default:
91                         num = 0;
92         }
93         return num;
94 }
95
96 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
97 {
98         struct bench *prefs;
99         char total_size[MAX_ARG_LEN + 1];
100         char object_size[MAX_ARG_LEN + 1];
101         char block_size[MAX_ARG_LEN + 1];
102         struct xseg *xseg = peer->xseg;
103         unsigned int xseg_page_size = 1 << xseg->config.page_shift;
104         long dst_port = -1;
105
106         total_size[0] = 0;
107         block_size[0] = 0;
108         object_size[0] = 0;
109
110         prefs = malloc(sizeof(struct bench));
111         if (!prefs) {
112                 perror("malloc");
113                 return -1;
114         }
115
116         //Begin reading the benchmark-specific arguments
117         BEGIN_READ_ARGS(argc, argv);
118         READ_ARG_STRING("-ts", total_size, MAX_ARG_LEN);
119         READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
120         READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
121         READ_ARG_ULONG("--iodepth", prefs->iodepth);
122         READ_ARG_ULONG("-dp", dst_port);
123         END_READ_ARGS();
124
125         /*************************
126          * Check size parameters *
127          *************************/
128
129         //Block size (bs): Defaults to 4K.
130         //It must be a number followed by one of these characters: [k|K|m|M|g|G].
131         //If not, it will be considered as size in bytes.
132         //Must be integer multiple of segment's page size (typically 4k).
133         if (!block_size[0])
134                 strcpy(block_size,"4k");
135
136         prefs->bs = str2num(block_size);
137         if (!prefs->bs) {
138                 XSEGLOG2(&lc, E, "Invalid syntax: %s\n", block_size);
139                 goto arg_fail;
140         } else if (prefs->bs % xseg_page_size) {
141                 XSEGLOG2(&lc, E, "Misaligned block size: %s\n", block_size);
142                 goto arg_fail;
143         }
144
145         //Total I/O size (ts): Must be supplied by user.
146         //Must have the same format as "total size"
147         //Must be integer multiple of "block size"
148         if (!total_size[0]) {
149                 XSEGLOG2(&lc, E, "Total I/O size needs to be supplied\n");
150                 goto arg_fail;
151         }
152
153         prefs->ts = str2num(total_size);
154         if (!prefs->ts) {
155                 XSEGLOG2(&lc, E, "Invalid syntax: %s\n", total_size);
156                 goto arg_fail;
157         } else if (prefs->ts % prefs->bs) {
158                 XSEGLOG2(&lc, E, "Misaligned total I/O size: %s\n", total_size);
159                 goto arg_fail;
160         } else if (prefs->ts > xseg->segment_size) {
161                 XSEGLOG2(&lc, E, "Total I/O size exceeds segment size\n", total_size);
162                 goto arg_fail;
163         }
164
165         //Object size (os): Defaults to 4M.
166         //Must have the same format as "total size"
167         //Must be integer multiple of "block size"
168         if (!object_size[0])
169                 strcpy(object_size,"4M");
170
171         prefs->os = str2num(object_size);
172         if (!prefs->os) {
173                 XSEGLOG2(&lc, E, "Invalid syntax: %s\n", object_size);
174                 goto arg_fail;
175         } else if (prefs->os % prefs->bs) {
176                 XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
177                 goto arg_fail;
178         }
179
180         /*************************
181          * Check port parameters *
182          *************************/
183
184         if (dst_port < 0){
185                 XSEGLOG2(&lc, E, "Destination port needs to be supplied\n");
186                 goto arg_fail;
187         }
188
189         prefs->src_port = peer->portno_start; //TODO: allow user to change this
190         prefs->dst_port = (xport) dst_port;
191
192         /**************************
193          * Customize struct peerd *
194          **************************/
195
196         prefs->total_tm = malloc(sizeof(struct timer));
197         prefs->get_tm = malloc(sizeof(struct timer));
198         prefs->sub_tm = malloc(sizeof(struct timer));
199         prefs->rec_tm = malloc(sizeof(struct timer));
200         if (!prefs->total_tm || !prefs->get_tm || !prefs->sub_tm ||
201                         !prefs->rec_tm) {
202                 perror("malloc");
203                 return -1;
204         }
205
206         peer->custom_peerd_loop = custom_peerd_loop;
207         peer->priv = (void *) prefs;
208         return 0;
209
210 arg_fail:
211         free(prefs);
212         custom_peer_usage();
213         return -1;
214 }
215
216
217 int send_request(struct peerd *peer, struct bench *prefs)
218 {
219         struct xseg_request *req;
220         struct xseg *xseg = peer->xseg;
221         xport srcport = prefs->src_port;
222         xport dstport = prefs->dst_port;
223         xport p;
224
225         int r;
226         uint32_t targetlen=10; //FIXME: handle it better
227         uint64_t size = prefs->os;
228
229         //srcport and dstport must already be provided by the user.
230         //returns struct xseg_request with basic initializations
231         req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
232         if (!req) {
233                 fprintf(stderr, "No request\n");
234                 return -1;
235         }
236
237         //Allocate enough space for the data and the target's name
238         r = xseg_prep_request(xseg, req, targetlen, size);
239         if (r < 0) {
240                 fprintf(stderr, "Cannot prepare request! (%lu, %llu)\n",
241                         (unsigned long)targetlen, (unsigned long long)size);
242                 xseg_put_request(xseg, req, srcport);
243                 return -1;
244         }
245
246 #if 0
247         //TODO: allow strcpy, memcpy
248         //Copy target's name to the newly allocated space
249         req_target = xseg_get_target(xseg, req);
250         strncpy(req_target, target, targetlen);
251
252         //Copy data buffer to the newly allocated space
253         req_data = xseg_get_data(xseg, req);
254         memcpy(req_data, buf, size);
255         req->offset = offset;
256         req->size = size;
257         req->op = X_WRITE;
258 #endif
259
260         //Submit the request from the source port to the target port
261         timer_start(prefs->sub_tm);
262         p = xseg_submit(xseg, req, srcport, X_ALLOC);
263         if (p == NoPort) {
264                 fprintf(stderr, "Cannot submit\n");
265                 return -1;
266         }
267         timer_stop(prefs->sub_tm);
268
269         //Send SIGIO to the process that has binded this port to inform that
270         //IO is possible
271         xseg_signal(xseg, p);
272
273         return 0;
274 }
275
276 /*
277  * This function substitutes the default peerd_loop of peer.c.
278  * It's plugged to struct peerd at custom peer's initialisation
279  */
280 int custom_peerd_loop(struct peerd *peer)
281 {
282 #ifdef MT
283         int i;
284         if (peer->interactive_func)
285                 peer->interactive_func();
286         for (i = 0; i < peer->nr_threads; i++) {
287                 pthread_join(peer->thread[i].tid, NULL);
288         }
289 #else
290         struct xseg *xseg = peer->xseg;
291         struct bench *prefs = peer->priv;
292
293         xport portno_start = peer->portno_start;
294         xport portno_end = peer->portno_end;
295         uint64_t threshold=1000/(1 + portno_end - portno_start);
296         pid_t pid =syscall(SYS_gettid);
297         XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
298         xseg_init_local_signal(xseg, peer->portno_start);
299         uint64_t loops;
300
301         uint64_t remaining = prefs->ts;
302
303         while (!isTerminate()
304                         && xq_count(&peer->free_reqs) == peer->nr_ops
305                         && remaining) {
306
307                 while (prefs->sub_tm->completed - prefs->sub_tm->completed <
308                                 prefs->iodepth){
309                         send_request(peer, prefs);
310                 }
311
312                 for (loops = threshold; loops > 0; loops--) {
313                         if (loops == 1)
314                                 xseg_prepare_wait(xseg, peer->portno_start);
315                         if (check_ports(peer))
316                                 loops = threshold;
317                 }
318 #ifdef ST_THREADS
319                 if (ta){
320                         st_sleep(0);
321                         continue;
322                 }
323 #endif
324                 XSEGLOG2(&lc, I, "Peer goes to sleep\n");
325                 xseg_wait_signal(xseg, 10000000UL);
326                 xseg_cancel_wait(xseg, peer->portno_start);
327                 XSEGLOG2(&lc, I, "Peer woke up\n");
328         }
329         custom_peer_finalize(peer);
330         xseg_quit_local_signal(xseg, peer->portno_start);
331 #endif
332         return 0;
333 }
334
335 void custom_peer_finalize(struct peerd *peer)
336 {
337         return;
338 }
339
340 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
341                 enum dispatch_reason reason)
342 {
343         if (canDefer(peer))
344                 defer_request(peer, pr);
345         else {
346 //              printf("completing req id: %u (remote %u)\n", (unsigned int) (pr - peer->peer_reqs), (unsigned int) pr->req->priv);
347 //              nanosleep(&delay,NULL);
348 //              print_req(peer->xseg, pr->req);
349                 complete(peer, pr);
350         }
351         return 0;
352 }