| // SPDX-License-Identifier: GPL-2.0 |
| /* |
| * Copyright (c) 2004 SuSE, Inc. All Rights Reserved. |
| */ |
| |
| /* |
| * aio-stress |
| * |
| * will open or create each file on the command line, and start a series |
| * of aio to it. |
| * |
| * aio is done in a rotating loop. first file1 gets 8 requests, then |
| * file2, then file3 etc. As each file finishes writing, it is switched |
| * to reads |
| * |
| * io buffers are aligned in case you want to do raw io |
| * |
| * compile with gcc -Wall -laio -lpthread -o aio-stress aio-stress.c |
| * |
| * run aio-stress -h to see the options |
| * |
| * Please mail Chris Mason (mason@suse.com) with bug reports or patches |
| */ |
| #define _FILE_OFFSET_BITS 64 |
| #define PROG_VERSION "0.21" |
| #define NEW_GETEVENTS |
| |
| #include <stdio.h> |
| #include <errno.h> |
| #include <assert.h> |
| #include <stdlib.h> |
| |
| #include <sys/types.h> |
| #include <sys/stat.h> |
| #include <fcntl.h> |
| #include <unistd.h> |
| #include <sys/time.h> |
| #include <libaio.h> |
| #include <sys/ipc.h> |
| #include <sys/shm.h> |
| #include <sys/mman.h> |
| #include <string.h> |
| #include <pthread.h> |
| |
| #define IO_FREE 0 |
| #define IO_PENDING 1 |
| #define RUN_FOREVER -1 |
| |
| #ifndef O_DIRECT |
| #define O_DIRECT 040000 /* direct disk access hint */ |
| #endif |
| |
| enum { |
| WRITE, |
| READ, |
| RWRITE, |
| RREAD, |
| LAST_STAGE, |
| }; |
| |
| #define USE_MALLOC 0 |
| #define USE_SHM 1 |
| #define USE_SHMFS 2 |
| |
| /* |
| * various globals, these are effectively read only by the time the threads |
| * are started |
| */ |
| long stages = 0; |
| unsigned long page_size_mask; |
| int o_direct = 0; |
| int o_sync = 0; |
| int latency_stats = 0; |
| int completion_latency_stats = 0; |
| int io_iter = 8; |
| int iterations = RUN_FOREVER; |
| int max_io_submit = 0; |
| long rec_len = 64 * 1024; |
| int depth = 64; |
| int num_threads = 1; |
| int num_contexts = 1; |
| off_t context_offset = 2 * 1024 * 1024; |
| int fsync_stages = 1; |
| int use_shm = 0; |
| int shm_id; |
| char *unaligned_buffer = NULL; |
| char *aligned_buffer = NULL; |
| int padded_reclen = 0; |
| int stonewall = 1; |
| int verify = 0; |
| char *verify_buf = NULL; |
| int unlink_files = 0; |
| |
| struct io_unit; |
| struct thread_info; |
| |
| /* pthread mutexes and other globals for keeping the threads in sync */ |
| pthread_cond_t stage_cond = PTHREAD_COND_INITIALIZER; |
| pthread_mutex_t stage_mutex = PTHREAD_MUTEX_INITIALIZER; |
| int threads_ending = 0; |
| int threads_starting = 0; |
| struct timeval global_stage_start_time; |
| struct thread_info *global_thread_info; |
| |
| /* |
| * latencies during io_submit are measured, these are the |
| * granularities for deviations |
| */ |
| #define DEVIATIONS 6 |
| int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 }; |
| struct io_latency { |
| double max; |
| double min; |
| double total_io; |
| double total_lat; |
| double deviations[DEVIATIONS]; |
| }; |
| |
| /* container for a series of operations to a file */ |
| struct io_oper { |
| /* already open file descriptor, valid for whatever operation you want */ |
| int fd; |
| |
| /* starting byte of the operation */ |
| off_t start; |
| |
| /* ending byte of the operation */ |
| off_t end; |
| |
| /* size of the read/write buffer */ |
| int reclen; |
| |
| /* max number of pending requests before a wait is triggered */ |
| int depth; |
| |
| /* current number of pending requests */ |
| int num_pending; |
| |
| /* last error, zero if there were none */ |
| int last_err; |
| |
| /* total number of errors hit. */ |
| int num_err; |
| |
| /* read,write, random, etc */ |
| int rw; |
| |
| /* number of ios that will get sent to aio */ |
| int total_ios; |
| |
| /* number of ios we've already sent */ |
| int started_ios; |
| |
| /* last offset used in an io operation */ |
| off_t last_offset; |
| |
| /* stonewalled = 1 when we got cut off before submitting all our ios */ |
| int stonewalled; |
| |
| /* list management */ |
| struct io_oper *next; |
| struct io_oper *prev; |
| |
| struct timeval start_time; |
| |
| char *file_name; |
| }; |
| |
| /* a single io, and all the tracking needed for it */ |
| struct io_unit { |
| /* note, iocb must go first! */ |
| struct iocb iocb; |
| |
| /* pointer to parent io operation struct */ |
| struct io_oper *io_oper; |
| |
| /* aligned buffer */ |
| char *buf; |
| |
| /* size of the aligned buffer (record size) */ |
| int buf_size; |
| |
| /* state of this io unit (free, pending, done) */ |
| int busy; |
| |
| /* result of last operation */ |
| long res; |
| |
| struct io_unit *next; |
| |
| struct timeval io_start_time; /* time of io_submit */ |
| }; |
| |
| struct thread_info { |
| io_context_t io_ctx; |
| pthread_t tid; |
| |
| /* allocated array of io_unit structs */ |
| struct io_unit *ios; |
| |
| /* list of io units available for io */ |
| struct io_unit *free_ious; |
| |
| /* number of io units in the ios array */ |
| int num_global_ios; |
| |
| /* number of io units in flight */ |
| int num_global_pending; |
| |
| /* preallocated array of iocb pointers, only used in run_active */ |
| struct iocb **iocbs; |
| |
| /* preallocated array of events */ |
| struct io_event *events; |
| |
| /* size of the events array */ |
| int num_global_events; |
| |
| /* latency stats for io_submit */ |
| struct io_latency io_submit_latency; |
| |
| /* list of operations still in progress, and of those finished */ |
| struct io_oper *active_opers; |
| struct io_oper *finished_opers; |
| |
| /* number of files this thread is doing io on */ |
| int num_files; |
| |
| /* how much io this thread did in the last stage */ |
| double stage_mb_trans; |
| |
| /* latency completion stats i/o time from io_submit until io_getevents */ |
| struct io_latency io_completion_latency; |
| }; |
| |
| /* |
| * return seconds between start_tv and stop_tv in double precision |
| */ |
| static double time_since(struct timeval *start_tv, struct timeval *stop_tv) |
| { |
| double sec, usec; |
| double ret; |
| sec = stop_tv->tv_sec - start_tv->tv_sec; |
| usec = stop_tv->tv_usec - start_tv->tv_usec; |
| if (sec > 0 && usec < 0) { |
| sec--; |
| usec += 1000000; |
| } |
| ret = sec + usec / (double)1000000; |
| if (ret < 0) |
| ret = 0; |
| return ret; |
| } |
| |
| /* |
| * return seconds between start_tv and now in double precision |
| */ |
| static double time_since_now(struct timeval *start_tv) |
| { |
| struct timeval stop_time; |
| gettimeofday(&stop_time, NULL); |
| return time_since(start_tv, &stop_time); |
| } |
| |
| /* |
| * Add latency info to latency struct |
| */ |
| static void calc_latency(struct timeval *start_tv, struct timeval *stop_tv, |
| struct io_latency *lat) |
| { |
| double delta; |
| int i; |
| delta = time_since(start_tv, stop_tv); |
| delta = delta * 1000; |
| |
| if (delta > lat->max) |
| lat->max = delta; |
| if (!lat->min || delta < lat->min) |
| lat->min = delta; |
| lat->total_io++; |
| lat->total_lat += delta; |
| for (i = 0 ; i < DEVIATIONS ; i++) { |
| if (delta < deviations[i]) { |
| lat->deviations[i]++; |
| break; |
| } |
| } |
| } |
| |
| static void oper_list_add(struct io_oper *oper, struct io_oper **list) |
| { |
| if (!*list) { |
| *list = oper; |
| oper->prev = oper->next = oper; |
| return; |
| } |
| oper->prev = (*list)->prev; |
| oper->next = *list; |
| (*list)->prev->next = oper; |
| (*list)->prev = oper; |
| return; |
| } |
| |
| static void oper_list_del(struct io_oper *oper, struct io_oper **list) |
| { |
| if ((*list)->next == (*list)->prev && *list == (*list)->next) { |
| *list = NULL; |
| return; |
| } |
| oper->prev->next = oper->next; |
| oper->next->prev = oper->prev; |
| if (*list == oper) |
| *list = oper->next; |
| } |
| |
| /* worker func to check error fields in the io unit */ |
| static int check_finished_io(struct io_unit *io) { |
| int i; |
| if (io->res != io->buf_size) { |
| |
| struct stat s; |
| fstat(io->io_oper->fd, &s); |
| |
| /* |
| * If file size is large enough for the read, then this short |
| * read is an error. |
| */ |
| if ((io->io_oper->rw == READ || io->io_oper->rw == RREAD) && |
| s.st_size > (io->iocb.u.c.offset + io->res)) { |
| |
| fprintf(stderr, "io err %lu (%s) op %d, off %Lu size %d\n", |
| io->res, strerror(-io->res), io->iocb.aio_lio_opcode, |
| io->iocb.u.c.offset, io->buf_size); |
| io->io_oper->last_err = io->res; |
| io->io_oper->num_err++; |
| return -1; |
| } |
| } |
| if (verify && io->io_oper->rw == READ) { |
| if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) { |
| fprintf(stderr, "verify error, file %s offset %Lu contents (offset:bad:good):\n", |
| io->io_oper->file_name, io->iocb.u.c.offset); |
| |
| for (i = 0 ; i < io->io_oper->reclen ; i++) { |
| if (io->buf[i] != verify_buf[i]) { |
| fprintf(stderr, "%d:%c:%c ", i, io->buf[i], verify_buf[i]); |
| } |
| } |
| fprintf(stderr, "\n"); |
| } |
| |
| } |
| return 0; |
| } |
| |
| /* worker func to check the busy bits and get an io unit ready for use */ |
| static int grab_iou(struct io_unit *io, struct io_oper *oper) { |
| if (io->busy == IO_PENDING) |
| return -1; |
| |
| io->busy = IO_PENDING; |
| io->res = 0; |
| io->io_oper = oper; |
| return 0; |
| } |
| |
| char *stage_name(int rw) { |
| switch(rw) { |
| case WRITE: |
| return "write"; |
| case READ: |
| return "read"; |
| case RWRITE: |
| return "random write"; |
| case RREAD: |
| return "random read"; |
| } |
| return "unknown"; |
| } |
| |
| static inline double oper_mb_trans(struct io_oper *oper) { |
| return ((double)oper->started_ios * (double)oper->reclen) / |
| (double)(1024 * 1024); |
| } |
| |
| static void print_time(struct io_oper *oper) { |
| double runtime; |
| double tput; |
| double mb; |
| |
| runtime = time_since_now(&oper->start_time); |
| mb = oper_mb_trans(oper); |
| tput = mb / runtime; |
| fprintf(stderr, "%s on %s (%.2f MB/s) %.2f MB in %.2fs\n", |
| stage_name(oper->rw), oper->file_name, tput, mb, runtime); |
| } |
| |
| static void print_lat(char *str, struct io_latency *lat) { |
| double avg = lat->total_lat / lat->total_io; |
| int i; |
| double total_counted = 0; |
| fprintf(stderr, "%s min %.2f avg %.2f max %.2f\n\t", |
| str, lat->min, avg, lat->max); |
| |
| for (i = 0 ; i < DEVIATIONS ; i++) { |
| fprintf(stderr, " %.0f < %d", lat->deviations[i], deviations[i]); |
| total_counted += lat->deviations[i]; |
| } |
| if (total_counted && lat->total_io - total_counted) |
| fprintf(stderr, " < %.0f", lat->total_io - total_counted); |
| fprintf(stderr, "\n"); |
| memset(lat, 0, sizeof(*lat)); |
| } |
| |
| static void print_latency(struct thread_info *t) |
| { |
| struct io_latency *lat = &t->io_submit_latency; |
| print_lat("latency", lat); |
| } |
| |
| static void print_completion_latency(struct thread_info *t) |
| { |
| struct io_latency *lat = &t->io_completion_latency; |
| print_lat("completion latency", lat); |
| } |
| |
| /* |
| * updates the fields in the io operation struct that belongs to this |
| * io unit, and make the io unit reusable again |
| */ |
| void finish_io(struct thread_info *t, struct io_unit *io, long result, |
| struct timeval *tv_now) { |
| struct io_oper *oper = io->io_oper; |
| |
| calc_latency(&io->io_start_time, tv_now, &t->io_completion_latency); |
| io->res = result; |
| io->busy = IO_FREE; |
| io->next = t->free_ious; |
| t->free_ious = io; |
| oper->num_pending--; |
| t->num_global_pending--; |
| check_finished_io(io); |
| if (oper->num_pending == 0 && |
| (oper->started_ios == oper->total_ios || oper->stonewalled)) |
| { |
| print_time(oper); |
| } |
| } |
| |
| int read_some_events(struct thread_info *t) { |
| struct io_unit *event_io; |
| struct io_event *event; |
| int nr; |
| int i; |
| int min_nr = io_iter; |
| struct timeval stop_time; |
| |
| if (t->num_global_pending < io_iter) |
| min_nr = t->num_global_pending; |
| |
| #ifdef NEW_GETEVENTS |
| nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events,NULL); |
| #else |
| nr = io_getevents(t->io_ctx, t->num_global_events, t->events, NULL); |
| #endif |
| if (nr <= 0) |
| return nr; |
| |
| gettimeofday(&stop_time, NULL); |
| for (i = 0 ; i < nr ; i++) { |
| event = t->events + i; |
| event_io = (struct io_unit *)((unsigned long)event->obj); |
| finish_io(t, event_io, event->res, &stop_time); |
| } |
| return nr; |
| } |
| |
| /* |
| * finds a free io unit, waiting for pending requests if required. returns |
| * null if none could be found |
| */ |
| static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper) |
| { |
| struct io_unit *event_io; |
| int nr; |
| |
| retry: |
| if (t->free_ious) { |
| event_io = t->free_ious; |
| t->free_ious = t->free_ious->next; |
| if (grab_iou(event_io, oper)) { |
| fprintf(stderr, "io unit on free list but not free\n"); |
| abort(); |
| } |
| return event_io; |
| } |
| nr = read_some_events(t); |
| if (nr > 0) |
| goto retry; |
| else |
| fprintf(stderr, "no free ious after read_some_events\n"); |
| return NULL; |
| } |
| |
| /* |
| * wait for all pending requests for this io operation to finish |
| */ |
| static int io_oper_wait(struct thread_info *t, struct io_oper *oper) { |
| struct io_event event; |
| struct io_unit *event_io; |
| |
| if (oper == NULL) { |
| return 0; |
| } |
| |
| if (oper->num_pending == 0) |
| goto done; |
| |
| /* this func is not speed sensitive, no need to go wild reading |
| * more than one event at a time |
| */ |
| #ifdef NEW_GETEVENTS |
| while(io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) { |
| #else |
| while(io_getevents(t->io_ctx, 1, &event, NULL) > 0) { |
| #endif |
| struct timeval tv_now; |
| event_io = (struct io_unit *)((unsigned long)event.obj); |
| |
| gettimeofday(&tv_now, NULL); |
| finish_io(t, event_io, event.res, &tv_now); |
| |
| if (oper->num_pending == 0) |
| break; |
| } |
| done: |
| if (oper->num_err) { |
| fprintf(stderr, "%u errors on oper, last %u\n", |
| oper->num_err, oper->last_err); |
| } |
| return 0; |
| } |
| |
| off_t random_byte_offset(struct io_oper *oper) { |
| off_t num; |
| off_t rand_byte = oper->start; |
| off_t range; |
| off_t offset = 1; |
| |
| range = (oper->end - oper->start) / (1024 * 1024); |
| if ((page_size_mask+1) > (1024 * 1024)) |
| offset = (page_size_mask+1) / (1024 * 1024); |
| if (range < offset) |
| range = 0; |
| else |
| range -= offset; |
| |
| /* find a random mb offset */ |
| num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0 )); |
| rand_byte += num * 1024 * 1024; |
| |
| /* find a random byte offset */ |
| num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0)); |
| |
| /* page align */ |
| num = (num + page_size_mask) & ~page_size_mask; |
| rand_byte += num; |
| |
| if (rand_byte + oper->reclen > oper->end) { |
| rand_byte -= oper->reclen; |
| } |
| return rand_byte; |
| } |
| |
| /* |
| * build an aio iocb for an operation, based on oper->rw and the |
| * last offset used. This finds the struct io_unit that will be attached |
| * to the iocb, and things are ready for submission to aio after this |
| * is called. |
| * |
| * returns null on error |
| */ |
| static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper) |
| { |
| struct io_unit *io; |
| off_t rand_byte; |
| |
| io = find_iou(t, oper); |
| if (!io) { |
| fprintf(stderr, "unable to find io unit\n"); |
| return NULL; |
| } |
| |
| switch(oper->rw) { |
| case WRITE: |
| io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen, |
| oper->last_offset); |
| oper->last_offset += oper->reclen; |
| break; |
| case READ: |
| io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen, |
| oper->last_offset); |
| oper->last_offset += oper->reclen; |
| break; |
| case RREAD: |
| rand_byte = random_byte_offset(oper); |
| oper->last_offset = rand_byte; |
| io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen, |
| rand_byte); |
| break; |
| case RWRITE: |
| rand_byte = random_byte_offset(oper); |
| oper->last_offset = rand_byte; |
| io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen, |
| rand_byte); |
| |
| break; |
| } |
| |
| return io; |
| } |
| |
| /* |
| * wait for any pending requests, and then free all ram associated with |
| * an operation. returns the last error the operation hit (zero means none) |
| */ |
| static int |
| finish_oper(struct thread_info *t, struct io_oper *oper) |
| { |
| unsigned long last_err; |
| |
| io_oper_wait(t, oper); |
| last_err = oper->last_err; |
| if (oper->num_pending > 0) { |
| fprintf(stderr, "oper num_pending is %d\n", oper->num_pending); |
| } |
| close(oper->fd); |
| free(oper); |
| return last_err; |
| } |
| |
| /* |
| * allocates an io operation and fills in all the fields. returns |
| * null on error |
| */ |
| static struct io_oper * |
| create_oper(int fd, int rw, off_t start, off_t end, int reclen, int depth, |
| int iter, char *file_name) |
| { |
| struct io_oper *oper; |
| |
| oper = malloc (sizeof(*oper)); |
| if (!oper) { |
| fprintf(stderr, "unable to allocate io oper\n"); |
| return NULL; |
| } |
| memset(oper, 0, sizeof(*oper)); |
| |
| oper->depth = depth; |
| oper->start = start; |
| oper->end = end; |
| oper->last_offset = oper->start; |
| oper->fd = fd; |
| oper->reclen = reclen; |
| oper->rw = rw; |
| oper->total_ios = (oper->end - oper->start) / oper->reclen; |
| oper->file_name = file_name; |
| |
| return oper; |
| } |
| |
| /* |
| * does setup on num_ios worth of iocbs, but does not actually |
| * start any io |
| */ |
| int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios, |
| struct iocb **my_iocbs) |
| { |
| int i; |
| struct io_unit *io; |
| |
| if (oper->started_ios == 0) |
| gettimeofday(&oper->start_time, NULL); |
| |
| if (num_ios == 0) |
| num_ios = oper->total_ios; |
| |
| if ((oper->started_ios + num_ios) > oper->total_ios) |
| num_ios = oper->total_ios - oper->started_ios; |
| |
| for( i = 0 ; i < num_ios ; i++) { |
| io = build_iocb(t, oper); |
| if (!io) { |
| return -1; |
| } |
| my_iocbs[i] = &io->iocb; |
| } |
| return num_ios; |
| } |
| |
| /* |
| * runs through the iocbs in the array provided and updates |
| * counters in the associated oper struct |
| */ |
| static void update_iou_counters(struct iocb **my_iocbs, int nr, |
| struct timeval *tv_now) |
| { |
| struct io_unit *io; |
| int i; |
| for (i = 0 ; i < nr ; i++) { |
| io = (struct io_unit *)(my_iocbs[i]); |
| io->io_oper->num_pending++; |
| io->io_oper->started_ios++; |
| io->io_start_time = *tv_now; /* set time of io_submit */ |
| } |
| } |
| |
| /* starts some io for a given file, returns zero if all went well */ |
| int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs) |
| { |
| int ret; |
| struct timeval start_time; |
| struct timeval stop_time; |
| |
| resubmit: |
| gettimeofday(&start_time, NULL); |
| ret = io_submit(t->io_ctx, num_ios, my_iocbs); |
| gettimeofday(&stop_time, NULL); |
| calc_latency(&start_time, &stop_time, &t->io_submit_latency); |
| |
| if (ret != num_ios) { |
| /* some ios got through */ |
| if (ret > 0) { |
| update_iou_counters(my_iocbs, ret, &stop_time); |
| my_iocbs += ret; |
| t->num_global_pending += ret; |
| num_ios -= ret; |
| } |
| /* |
| * we've used all the requests allocated in aio_init, wait and |
| * retry |
| */ |
| if (ret > 0 || ret == -EAGAIN) { |
| int old_ret = ret; |
| if ((ret = read_some_events(t) > 0)) { |
| goto resubmit; |
| } else { |
| fprintf(stderr, "ret was %d and now is %d\n", ret, old_ret); |
| abort(); |
| } |
| } |
| |
| fprintf(stderr, "ret %d (%s) on io_submit\n", ret, strerror(-ret)); |
| return -1; |
| } |
| update_iou_counters(my_iocbs, ret, &stop_time); |
| t->num_global_pending += ret; |
| return 0; |
| } |
| |
| /* |
| * changes oper->rw to the next in a command sequence, or returns zero |
| * to say this operation is really, completely done for |
| */ |
| static int restart_oper(struct io_oper *oper) { |
| int new_rw = 0; |
| if (oper->last_err) |
| return 0; |
| |
| /* this switch falls through */ |
| switch(oper->rw) { |
| case WRITE: |
| if (stages & (1 << READ)) |
| new_rw = READ; |
| case READ: |
| if (!new_rw && stages & (1 << RWRITE)) |
| new_rw = RWRITE; |
| case RWRITE: |
| if (!new_rw && stages & (1 << RREAD)) |
| new_rw = RREAD; |
| } |
| |
| if (new_rw) { |
| oper->started_ios = 0; |
| oper->last_offset = oper->start; |
| oper->stonewalled = 0; |
| |
| /* |
| * we're restarting an operation with pending requests, so the |
| * timing info won't be printed by finish_io. Printing it here |
| */ |
| if (oper->num_pending) |
| print_time(oper); |
| |
| oper->rw = new_rw; |
| return 1; |
| } |
| return 0; |
| } |
| |
| static int oper_runnable(struct io_oper *oper) { |
| struct stat buf; |
| int ret; |
| |
| /* first context is always runnable, if started_ios > 0, no need to |
| * redo the calculations |
| */ |
| if (oper->started_ios || oper->start == 0) |
| return 1; |
| /* |
| * only the sequential phases force delays in starting */ |
| if (oper->rw >= RWRITE) |
| return 1; |
| ret = fstat(oper->fd, &buf); |
| if (ret < 0) { |
| perror("fstat"); |
| exit(1); |
| } |
| if (S_ISREG(buf.st_mode) && buf.st_size < oper->start) |
| return 0; |
| return 1; |
| } |
| |
| /* |
| * runs through all the io operations on the active list, and starts |
| * a chunk of io on each. If any io operations are completely finished, |
| * it either switches them to the next stage or puts them on the |
| * finished list. |
| * |
| * this function stops after max_io_submit iocbs are sent down the |
| * pipe, even if it has not yet touched all the operations on the |
| * active list. Any operations that have finished are moved onto |
| * the finished_opers list. |
| */ |
| static int run_active_list(struct thread_info *t, |
| int io_iter, |
| int max_io_submit) |
| { |
| struct io_oper *oper; |
| struct io_oper *built_opers = NULL; |
| struct iocb **my_iocbs = t->iocbs; |
| int ret = 0; |
| int num_built = 0; |
| |
| oper = t->active_opers; |
| while(oper) { |
| if (!oper_runnable(oper)) { |
| oper = oper->next; |
| if (oper == t->active_opers) |
| break; |
| continue; |
| } |
| ret = build_oper(t, oper, io_iter, my_iocbs); |
| if (ret >= 0) { |
| my_iocbs += ret; |
| num_built += ret; |
| oper_list_del(oper, &t->active_opers); |
| oper_list_add(oper, &built_opers); |
| oper = t->active_opers; |
| if (num_built + io_iter > max_io_submit) |
| break; |
| } else |
| break; |
| } |
| if (num_built) { |
| ret = run_built(t, num_built, t->iocbs); |
| if (ret < 0) { |
| fprintf(stderr, "error %d on run_built\n", ret); |
| exit(1); |
| } |
| while(built_opers) { |
| oper = built_opers; |
| oper_list_del(oper, &built_opers); |
| oper_list_add(oper, &t->active_opers); |
| if (oper->started_ios == oper->total_ios) { |
| oper_list_del(oper, &t->active_opers); |
| oper_list_add(oper, &t->finished_opers); |
| } |
| } |
| } |
| return 0; |
| } |
| |
| void drop_shm() { |
| int ret; |
| struct shmid_ds ds; |
| if (use_shm != USE_SHM) |
| return; |
| |
| ret = shmctl(shm_id, IPC_RMID, &ds); |
| if (ret) { |
| perror("shmctl IPC_RMID"); |
| } |
| } |
| |
| void aio_setup(io_context_t *io_ctx, int n) |
| { |
| int res = io_queue_init(n, io_ctx); |
| if (res != 0) { |
| fprintf(stderr, "io_queue_setup(%d) returned %d (%s)\n", |
| n, res, strerror(-res)); |
| exit(3); |
| } |
| } |
| |
| /* |
| * allocate io operation and event arrays for a given thread |
| */ |
| int setup_ious(struct thread_info *t, |
| int num_files, int depth, |
| int reclen, int max_io_submit) { |
| int i; |
| size_t bytes = num_files * depth * sizeof(*t->ios); |
| |
| t->ios = malloc(bytes); |
| if (!t->ios) { |
| fprintf(stderr, "unable to allocate io units\n"); |
| return -1; |
| } |
| memset(t->ios, 0, bytes); |
| |
| for (i = 0 ; i < depth * num_files; i++) { |
| t->ios[i].buf = aligned_buffer; |
| aligned_buffer += padded_reclen; |
| t->ios[i].buf_size = reclen; |
| if (verify) |
| memset(t->ios[i].buf, 'b', reclen); |
| else |
| memset(t->ios[i].buf, 0, reclen); |
| t->ios[i].next = t->free_ious; |
| t->free_ious = t->ios + i; |
| } |
| if (verify) { |
| verify_buf = aligned_buffer; |
| memset(verify_buf, 'b', reclen); |
| } |
| |
| t->iocbs = malloc(sizeof(struct iocb *) * max_io_submit); |
| if (!t->iocbs) { |
| fprintf(stderr, "unable to allocate iocbs\n"); |
| goto free_buffers; |
| } |
| |
| memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *)); |
| |
| t->events = malloc(sizeof(struct io_event) * depth * num_files); |
| if (!t->events) { |
| fprintf(stderr, "unable to allocate ram for events\n"); |
| goto free_buffers; |
| } |
| memset(t->events, 0, num_files * sizeof(struct io_event)*depth); |
| |
| t->num_global_ios = num_files * depth; |
| t->num_global_events = t->num_global_ios; |
| return 0; |
| |
| free_buffers: |
| if (t->ios) |
| free(t->ios); |
| if (t->iocbs) |
| free(t->iocbs); |
| if (t->events) |
| free(t->events); |
| return -1; |
| } |
| |
| /* |
| * The buffers used for file data are allocated as a single big |
| * malloc, and then each thread and operation takes a piece and uses |
| * that for file data. This lets us do a large shm or bigpages alloc |
| * and without trying to find a special place in each thread to map the |
| * buffers to |
| */ |
| int setup_shared_mem(int num_threads, int num_files, int depth, |
| int reclen, int max_io_submit) |
| { |
| char *p = NULL; |
| size_t total_ram; |
| |
| padded_reclen = (reclen + page_size_mask) / (page_size_mask+1); |
| padded_reclen = padded_reclen * (page_size_mask+1); |
| total_ram = num_files * depth * padded_reclen + num_threads; |
| if (verify) |
| total_ram += padded_reclen; |
| |
| if (use_shm == USE_MALLOC) { |
| p = malloc(total_ram + page_size_mask); |
| } else if (use_shm == USE_SHM) { |
| shm_id = shmget(IPC_PRIVATE, total_ram, IPC_CREAT | 0700); |
| if (shm_id < 0) { |
| perror("shmget"); |
| drop_shm(); |
| goto free_buffers; |
| } |
| p = shmat(shm_id, (char *)0x50000000, 0); |
| if ((long)p == -1) { |
| perror("shmat"); |
| goto free_buffers; |
| } |
| /* won't really be dropped until we shmdt */ |
| drop_shm(); |
| } else if (use_shm == USE_SHMFS) { |
| char mmap_name[16]; /* /dev/shm/ + null + XXXXXX */ |
| int fd; |
| |
| strcpy(mmap_name, "/dev/shm/XXXXXX"); |
| fd = mkstemp(mmap_name); |
| if (fd < 0) { |
| perror("mkstemp"); |
| goto free_buffers; |
| } |
| unlink(mmap_name); |
| ftruncate(fd, total_ram); |
| shm_id = fd; |
| p = mmap((char *)0x50000000, total_ram, |
| PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); |
| |
| if (p == MAP_FAILED) { |
| perror("mmap"); |
| goto free_buffers; |
| } |
| } |
| if (!p) { |
| fprintf(stderr, "unable to allocate buffers\n"); |
| goto free_buffers; |
| } |
| unaligned_buffer = p; |
| p = (char*)((intptr_t) (p + page_size_mask) & ~page_size_mask); |
| aligned_buffer = p; |
| return 0; |
| |
| free_buffers: |
| drop_shm(); |
| if (unaligned_buffer) |
| free(unaligned_buffer); |
| return -1; |
| } |
| |
| /* |
| * runs through all the thread_info structs and calculates a combined |
| * throughput |
| */ |
| void global_thread_throughput(struct thread_info *t, char *this_stage) { |
| int i; |
| double runtime = time_since_now(&global_stage_start_time); |
| double total_mb = 0; |
| double min_trans = 0; |
| |
| for (i = 0 ; i < num_threads ; i++) { |
| total_mb += global_thread_info[i].stage_mb_trans; |
| if (!min_trans || t->stage_mb_trans < min_trans) |
| min_trans = t->stage_mb_trans; |
| } |
| if (total_mb) { |
| fprintf(stderr, "%s throughput (%.2f MB/s) ", this_stage, |
| total_mb / runtime); |
| fprintf(stderr, "%.2f MB in %.2fs", total_mb, runtime); |
| if (stonewall) |
| fprintf(stderr, " min transfer %.2fMB", min_trans); |
| fprintf(stderr, "\n"); |
| } |
| } |
| |
| |
| /* this is the meat of the state machine. There is a list of |
| * active operations structs, and as each one finishes the required |
| * io it is moved to a list of finished operations. Once they have |
| * all finished whatever stage they were in, they are given the chance |
| * to restart and pick a different stage (read/write/random read etc) |
| * |
| * various timings are printed in between the stages, along with |
| * thread synchronization if there are more than one threads. |
| */ |
| int worker(struct thread_info *t) |
| { |
| struct io_oper *oper; |
| char *this_stage = NULL; |
| struct timeval stage_time; |
| int status = 0; |
| int iteration = 0; |
| int cnt; |
| |
| aio_setup(&t->io_ctx, 512); |
| |
| restart: |
| if (num_threads > 1) { |
| pthread_mutex_lock(&stage_mutex); |
| threads_starting++; |
| if (threads_starting == num_threads) { |
| threads_ending = 0; |
| gettimeofday(&global_stage_start_time, NULL); |
| pthread_cond_broadcast(&stage_cond); |
| } |
| while (threads_starting != num_threads) |
| pthread_cond_wait(&stage_cond, &stage_mutex); |
| pthread_mutex_unlock(&stage_mutex); |
| } |
| if (t->active_opers) { |
| this_stage = stage_name(t->active_opers->rw); |
| gettimeofday(&stage_time, NULL); |
| t->stage_mb_trans = 0; |
| } |
| |
| cnt = 0; |
| /* first we send everything through aio */ |
| while(t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) { |
| if (stonewall && threads_ending) { |
| oper = t->active_opers; |
| oper->stonewalled = 1; |
| oper_list_del(oper, &t->active_opers); |
| oper_list_add(oper, &t->finished_opers); |
| } else { |
| run_active_list(t, io_iter, max_io_submit); |
| } |
| cnt++; |
| } |
| if (latency_stats) |
| print_latency(t); |
| |
| if (completion_latency_stats) |
| print_completion_latency(t); |
| |
| /* then we wait for all the operations to finish */ |
| oper = t->finished_opers; |
| do { |
| if (!oper) |
| break; |
| io_oper_wait(t, oper); |
| oper = oper->next; |
| } while(oper != t->finished_opers); |
| |
| /* then we do an fsync to get the timing for any future operations |
| * right, and check to see if any of these need to get restarted |
| */ |
| oper = t->finished_opers; |
| while(oper) { |
| if (fsync_stages) |
| fsync(oper->fd); |
| t->stage_mb_trans += oper_mb_trans(oper); |
| if (restart_oper(oper)) { |
| oper_list_del(oper, &t->finished_opers); |
| oper_list_add(oper, &t->active_opers); |
| oper = t->finished_opers; |
| continue; |
| } |
| oper = oper->next; |
| if (oper == t->finished_opers) |
| break; |
| } |
| |
| if (t->stage_mb_trans && t->num_files > 0) { |
| double seconds = time_since_now(&stage_time); |
| fprintf(stderr, "thread %llu %s totals (%.2f MB/s) %.2f MB in %.2fs\n", |
| (unsigned long long)(t - global_thread_info), this_stage, |
| t->stage_mb_trans/seconds, t->stage_mb_trans, seconds); |
| } |
| |
| if (num_threads > 1) { |
| pthread_mutex_lock(&stage_mutex); |
| threads_ending++; |
| if (threads_ending == num_threads) { |
| threads_starting = 0; |
| pthread_cond_broadcast(&stage_cond); |
| global_thread_throughput(t, this_stage); |
| } |
| while(threads_ending != num_threads) |
| pthread_cond_wait(&stage_cond, &stage_mutex); |
| pthread_mutex_unlock(&stage_mutex); |
| } |
| |
| /* someone got restarted, go back to the beginning */ |
| if (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) { |
| iteration++; |
| goto restart; |
| } |
| |
| /* finally, free all the ram */ |
| while(t->finished_opers) { |
| oper = t->finished_opers; |
| oper_list_del(oper, &t->finished_opers); |
| status = finish_oper(t, oper); |
| } |
| |
| if (t->num_global_pending) { |
| fprintf(stderr, "global num pending is %d\n", t->num_global_pending); |
| } |
| io_queue_release(t->io_ctx); |
| |
| return status; |
| } |
| |
| typedef void * (*start_routine)(void *); |
| int run_workers(struct thread_info *t, int num_threads) |
| { |
| int ret; |
| int i; |
| |
| for(i = 0 ; i < num_threads ; i++) { |
| ret = pthread_create(&t[i].tid, NULL, (start_routine)worker, t + i); |
| if (ret) { |
| perror("pthread_create"); |
| exit(1); |
| } |
| } |
| for(i = 0 ; i < num_threads ; i++) { |
| ret = pthread_join(t[i].tid, NULL); |
| if (ret) { |
| perror("pthread_join"); |
| exit(1); |
| } |
| } |
| return 0; |
| } |
| |
| off_t parse_size(char *size_arg, off_t mult) { |
| char c; |
| int num; |
| off_t ret; |
| c = size_arg[strlen(size_arg) - 1]; |
| if (c > '9') { |
| size_arg[strlen(size_arg) - 1] = '\0'; |
| } |
| num = atoi(size_arg); |
| switch(c) { |
| case 'g': |
| case 'G': |
| mult = 1024 * 1024 * 1024; |
| break; |
| case 'm': |
| case 'M': |
| mult = 1024 * 1024; |
| break; |
| case 'k': |
| case 'K': |
| mult = 1024; |
| break; |
| case 'b': |
| case 'B': |
| mult = 1; |
| break; |
| } |
| ret = mult * num; |
| return ret; |
| } |
| |
| void print_usage(void) { |
| printf("usage: aio-stress [-s size] [-r size] [-a size] [-d num] [-b num]\n"); |
| printf(" [-i num] [-t num] [-c num] [-C size] [-nxhOS ]\n"); |
| printf(" file1 [file2 ...]\n"); |
| printf("\t-a size in KB at which to align buffers\n"); |
| printf("\t-b max number of iocbs to give io_submit at once\n"); |
| printf("\t-c number of io contexts per file\n"); |
| printf("\t-C offset between contexts, default 2MB\n"); |
| printf("\t-s size in MB of the test file(s), default 1024MB\n"); |
| printf("\t-r record size in KB used for each io, default 64KB\n"); |
| printf("\t-d number of pending aio requests for each file, default 64\n"); |
| printf("\t-i number of ios per file sent before switching\n\t to the next file, default 8\n"); |
| printf("\t-I total number of ayncs IOs the program will run, default is run until Cntl-C\n"); |
| printf("\t-O Use O_DIRECT (not available in 2.4 kernels),\n"); |
| printf("\t-S Use O_SYNC for writes\n"); |
| printf("\t-o add an operation to the list: write=0, read=1,\n"); |
| printf("\t random write=2, random read=3.\n"); |
| printf("\t repeat -o to specify multiple ops: -o 0 -o 1 etc.\n"); |
| printf("\t-m shm use ipc shared memory for io buffers instead of malloc\n"); |
| printf("\t-m shmfs mmap a file in /dev/shm for io buffers\n"); |
| printf("\t-n no fsyncs between write stage and read stage\n"); |
| printf("\t-l print io_submit latencies after each stage\n"); |
| printf("\t-L print io completion latencies after each stage\n"); |
| printf("\t-t number of threads to run\n"); |
| printf("\t-u unlink files after completion\n"); |
| printf("\t-v verification of bytes written\n"); |
| printf("\t-x turn off thread stonewalling\n"); |
| printf("\t-h this message\n"); |
| printf("\n\t the size options (-a -s and -r) allow modifiers -s 400{k,m,g}\n"); |
| printf("\t translate to 400KB, 400MB and 400GB\n"); |
| printf("version %s\n", PROG_VERSION); |
| } |
| |
| int main(int ac, char **av) |
| { |
| int rwfd; |
| int i; |
| int j; |
| int c; |
| |
| off_t file_size = 1 * 1024 * 1024 * 1024; |
| int first_stage = WRITE; |
| struct io_oper *oper; |
| int status = 0; |
| int num_files = 0; |
| int open_fds = 0; |
| struct thread_info *t; |
| |
| page_size_mask = getpagesize() - 1; |
| |
| while(1) { |
| c = getopt(ac, av, "a:b:c:C:m:s:r:d:i:I:o:t:lLnhOSxvu"); |
| if (c < 0) |
| break; |
| |
| switch(c) { |
| case 'a': |
| page_size_mask = parse_size(optarg, 1024); |
| page_size_mask--; |
| break; |
| case 'c': |
| num_contexts = atoi(optarg); |
| break; |
| case 'C': |
| context_offset = parse_size(optarg, 1024 * 1024); |
| case 'b': |
| max_io_submit = atoi(optarg); |
| break; |
| case 's': |
| file_size = parse_size(optarg, 1024 * 1024); |
| break; |
| case 'd': |
| depth = atoi(optarg); |
| break; |
| case 'r': |
| rec_len = parse_size(optarg, 1024); |
| break; |
| case 'i': |
| io_iter = atoi(optarg); |
| break; |
| case 'I': |
| iterations = atoi(optarg); |
| break; |
| case 'n': |
| fsync_stages = 0; |
| break; |
| case 'l': |
| latency_stats = 1; |
| break; |
| case 'L': |
| completion_latency_stats = 1; |
| break; |
| case 'm': |
| if (!strcmp(optarg, "shm")) { |
| fprintf(stderr, "using ipc shm\n"); |
| use_shm = USE_SHM; |
| } else if (!strcmp(optarg, "shmfs")) { |
| fprintf(stderr, "using /dev/shm for buffers\n"); |
| use_shm = USE_SHMFS; |
| } |
| break; |
| case 'o': |
| i = atoi(optarg); |
| stages |= 1 << i; |
| fprintf(stderr, "adding stage %s\n", stage_name(i)); |
| break; |
| case 'O': |
| o_direct = O_DIRECT; |
| break; |
| case 'S': |
| o_sync = O_SYNC; |
| break; |
| case 't': |
| num_threads = atoi(optarg); |
| break; |
| case 'x': |
| stonewall = 0; |
| break; |
| case 'u': |
| unlink_files = 1; |
| break; |
| case 'v': |
| verify = 1; |
| break; |
| case 'h': |
| default: |
| print_usage(); |
| exit(1); |
| } |
| } |
| |
| /* |
| * make sure we don't try to submit more ios than we have allocated |
| * memory for |
| */ |
| if (depth < io_iter) { |
| io_iter = depth; |
| fprintf(stderr, "dropping io_iter to %d\n", io_iter); |
| } |
| |
| if (optind >= ac) { |
| print_usage(); |
| exit(1); |
| } |
| |
| num_files = ac - optind; |
| |
| if (num_threads > (num_files * num_contexts)) { |
| num_threads = num_files * num_contexts; |
| fprintf(stderr, "dropping thread count to the number of contexts %d\n", |
| num_threads); |
| } |
| |
| t = calloc(num_threads, sizeof(*t)); |
| if (!t) { |
| perror("calloc"); |
| exit(1); |
| } |
| global_thread_info = t; |
| |
| /* by default, allow a huge number of iocbs to be sent towards |
| * io_submit |
| */ |
| if (!max_io_submit) |
| max_io_submit = num_files * io_iter * num_contexts; |
| |
| /* |
| * make sure we don't try to submit more ios than max_io_submit allows |
| */ |
| if (max_io_submit < io_iter) { |
| io_iter = max_io_submit; |
| fprintf(stderr, "dropping io_iter to %d\n", io_iter); |
| } |
| |
| if (!stages) { |
| stages = (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE); |
| } else { |
| for (i = 0 ; i < LAST_STAGE; i++) { |
| if (stages & (1 << i)) { |
| first_stage = i; |
| fprintf(stderr, "starting with %s\n", stage_name(i)); |
| break; |
| } |
| } |
| } |
| |
| if (file_size < num_contexts * context_offset) { |
| fprintf(stderr, "file size %Lu too small for %d contexts\n", |
| (unsigned long long)file_size, num_contexts); |
| exit(1); |
| } |
| |
| fprintf(stderr, "file size %LuMB, record size %luKB, depth %d, ios per iteration %d\n", |
| (unsigned long long)file_size / (1024 * 1024), |
| rec_len / 1024, depth, io_iter); |
| fprintf(stderr, "max io_submit %d, buffer alignment set to %luKB\n", |
| max_io_submit, (page_size_mask + 1)/1024); |
| fprintf(stderr, "threads %d files %d contexts %d context offset %LuMB verification %s\n", |
| num_threads, num_files, num_contexts, |
| (unsigned long long)context_offset / (1024 * 1024), |
| verify ? "on" : "off"); |
| /* open all the files and do any required setup for them */ |
| for (i = optind ; i < ac ; i++) { |
| int thread_index; |
| for (j = 0 ; j < num_contexts ; j++) { |
| thread_index = open_fds % num_threads; |
| open_fds++; |
| |
| rwfd = open(av[i], O_CREAT | O_RDWR | o_direct | o_sync, 0600); |
| assert(rwfd != -1); |
| |
| oper = create_oper(rwfd, first_stage, j * context_offset, |
| file_size - j * context_offset, rec_len, |
| depth, io_iter, av[i]); |
| if (!oper) { |
| fprintf(stderr, "error in create_oper\n"); |
| exit(-1); |
| } |
| oper_list_add(oper, &t[thread_index].active_opers); |
| t[thread_index].num_files++; |
| } |
| } |
| if (setup_shared_mem(num_threads, num_files * num_contexts, |
| depth, rec_len, max_io_submit)) |
| { |
| exit(1); |
| } |
| for (i = 0 ; i < num_threads ; i++) { |
| if (setup_ious(&t[i], t[i].num_files, depth, rec_len, max_io_submit)) |
| exit(1); |
| } |
| if (num_threads > 1){ |
| printf("Running multi thread version num_threads:%d\n", num_threads); |
| run_workers(t, num_threads); |
| } else { |
| printf("Running single thread version \n"); |
| status = worker(t); |
| } |
| if (unlink_files) { |
| for (i = optind ; i < ac ; i++) { |
| printf("Cleaning up file %s \n", av[i]); |
| unlink(av[i]); |
| } |
| } |
| |
| if (status) { |
| exit(1); |
| } |
| return status; |
| } |
| |
| |