From 804382981551068b5218612e51dfb91885a190c4 Mon Sep 17 00:00:00 2001 From: Antonio SJ Musumeci Date: Sat, 1 Jul 2017 23:29:47 -0400 Subject: [PATCH 1/2] fixed threads --- libfuse/lib/fuse_loop_mt.c | 72 +++++--------------------------------- 1 file changed, 9 insertions(+), 63 deletions(-) diff --git a/libfuse/lib/fuse_loop_mt.c b/libfuse/lib/fuse_loop_mt.c index 82e30014b..ad56ccae5 100644 --- a/libfuse/lib/fuse_loop_mt.c +++ b/libfuse/lib/fuse_loop_mt.c @@ -33,9 +33,6 @@ struct fuse_worker { }; struct fuse_mt { - pthread_mutex_t lock; - int numworker; - int numavail; struct fuse_session *se; struct fuse_chan *prevch; struct fuse_worker main; @@ -65,11 +62,10 @@ static int fuse_loop_start_thread(struct fuse_mt *mt); static void *fuse_do_work(void *data) { - struct fuse_worker *w = (struct fuse_worker *) data; - struct fuse_mt *mt = w->mt; + struct fuse_worker *w = (struct fuse_worker *) data; + struct fuse_mt *mt = w->mt; while (!fuse_session_exited(mt->se)) { - int isforget = 0; struct fuse_chan *ch = mt->prevch; struct fuse_buf fbuf = { .mem = w->buf, @@ -90,51 +86,10 @@ static void *fuse_do_work(void *data) break; } - pthread_mutex_lock(&mt->lock); - if (mt->exit) { - pthread_mutex_unlock(&mt->lock); + if (mt->exit) return NULL; - } - - /* - * This disgusting hack is needed so that zillions of threads - * are not created on a burst of FORGET messages - */ - if (!(fbuf.flags & FUSE_BUF_IS_FD)) { - struct fuse_in_header *in = fbuf.mem; - - if (in->opcode == FUSE_FORGET || - in->opcode == FUSE_BATCH_FORGET) - isforget = 1; - } - - if (!isforget) - mt->numavail--; - if (mt->numavail == 0) - fuse_loop_start_thread(mt); - pthread_mutex_unlock(&mt->lock); fuse_session_process_buf(mt->se, &fbuf, ch); - - pthread_mutex_lock(&mt->lock); - if (!isforget) - mt->numavail++; - if (mt->numavail > 10) { - if (mt->exit) { - pthread_mutex_unlock(&mt->lock); - return NULL; - } - list_del_worker(w); - mt->numavail--; - mt->numworker--; - pthread_mutex_unlock(&mt->lock); - - pthread_detach(w->thread_id); - free(w->buf); - free(w); - return NULL; - } - pthread_mutex_unlock(&mt->lock); } sem_post(&mt->finish); @@ -200,18 +155,14 @@ static int fuse_loop_start_thread(struct fuse_mt *mt) return -1; } list_add_worker(w, &mt->main); - mt->numavail ++; - mt->numworker ++; return 0; } -static void fuse_join_worker(struct fuse_mt *mt, struct fuse_worker *w) +static void fuse_join_worker(struct fuse_worker *w) { pthread_join(w->thread_id, NULL); - pthread_mutex_lock(&mt->lock); list_del_worker(w); - pthread_mutex_unlock(&mt->lock); free(w->buf); free(w); } @@ -226,34 +177,29 @@ int fuse_session_loop_mt(struct fuse_session *se) mt.se = se; mt.prevch = fuse_session_next_chan(se, NULL); mt.error = 0; - mt.numworker = 0; - mt.numavail = 0; mt.main.thread_id = pthread_self(); mt.main.prev = mt.main.next = &mt.main; sem_init(&mt.finish, 0, 0); - fuse_mutex_init(&mt.lock); - pthread_mutex_lock(&mt.lock); - err = fuse_loop_start_thread(&mt); - pthread_mutex_unlock(&mt.lock); + err = 0; + for(size_t i = 0; (i < 10) && !err; i++) + err = fuse_loop_start_thread(&mt); + if (!err) { /* sem_wait() is interruptible */ while (!fuse_session_exited(se)) sem_wait(&mt.finish); - pthread_mutex_lock(&mt.lock); for (w = mt.main.next; w != &mt.main; w = w->next) pthread_cancel(w->thread_id); mt.exit = 1; - pthread_mutex_unlock(&mt.lock); while (mt.main.next != &mt.main) - fuse_join_worker(&mt, mt.main.next); + fuse_join_worker(mt.main.next); err = mt.error; } - pthread_mutex_destroy(&mt.lock); sem_destroy(&mt.finish); fuse_session_reset(se); return err; From 75ed37a11a5977cb64a901d153831e51ce459191 Mon Sep 17 00:00:00 2001 From: Antonio SJ Musumeci Date: Sun, 2 Jul 2017 14:39:59 -0400 Subject: [PATCH 2/2] add setting of thread pool size --- README.md | 1 + libfuse/include/fuse.h | 4 +++- libfuse/include/fuse_lowlevel.h | 2 +- libfuse/lib/cuse_lowlevel.c | 2 +- libfuse/lib/fuse.c | 7 +++++++ libfuse/lib/fuse_loop_mt.c | 23 +++++++++++++++++++++-- libfuse/lib/fuse_mt.c | 6 ++++-- man/mergerfs.1 | 12 ++++++++++++ src/option_parser.cpp | 5 ++++- 9 files changed, 54 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 6d957ac13..81908c859 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,7 @@ mergerfs -o<options> <srcmounts> <mountpoint> * **symlinkify_timeout**: time to wait, in seconds, to activate the **symlinkify** behavior. (default: 3600) * **nullrw**: turns reads and writes into no-ops. The request will succeed but do nothing. Useful for benchmarking mergerfs. (default: false) * **ignorepponrename**: ignore path preserving on rename. Typically rename and link act differently depending on the policy of `create` (read below). Enabling this will cause rename and link to always use the non-path preserving behavior. This means files, when renamed or linked, will stay on the same drive. +* **threads**: number of threads to use in multithreaded mode. When set to zero (the default) it will attempt to discover and use the number of logical cores. If the lookup fails it will fall back to using 4. If the thread count is set negative it will look up the number of cores then divide by the absolute value. ie. threads=-2 on an 8 core machine will result in 8 / 2 = 4 threads. There will always be at least 1 thread. NOTE: higher number of threads increases parallelism but usually decreases throughput. * **fsname**: sets the name of the filesystem as seen in **mount**, **df**, etc. Defaults to a list of the source paths concatenated together with the longest common prefix removed. * **func.<func>=<policy>**: sets the specific FUSE function's policy. See below for the list of value types. Example: **func.getattr=newest** * **category.<category>=<policy>**: Sets policy of all FUSE functions in the provided category. Example: **category.create=mfs** diff --git a/libfuse/include/fuse.h b/libfuse/include/fuse.h index 911a676c5..d65db8b6b 100644 --- a/libfuse/include/fuse.h +++ b/libfuse/include/fuse.h @@ -115,7 +115,7 @@ struct fuse_operations { */ int (*mknod) (const char *, mode_t, dev_t); - /** Create a directory + /** Create a directory * * Note that the mode argument may not have the type specification * bits set, i.e. S_ISDIR(mode) can be false. To obtain the @@ -696,6 +696,8 @@ int fuse_loop(struct fuse *f); */ void fuse_exit(struct fuse *f); +int fuse_config_num_threads(const struct fuse *f); + /** * FUSE event loop with multiple threads * diff --git a/libfuse/include/fuse_lowlevel.h b/libfuse/include/fuse_lowlevel.h index 8262d38a8..ee7c0dd53 100644 --- a/libfuse/include/fuse_lowlevel.h +++ b/libfuse/include/fuse_lowlevel.h @@ -1700,7 +1700,7 @@ int fuse_session_loop(struct fuse_session *se); * @param se the session * @return 0 on success, -1 on error */ -int fuse_session_loop_mt(struct fuse_session *se); +int fuse_session_loop_mt(struct fuse_session *se, const int threads); /* ----------------------------------------------------------- * * Channel interface * diff --git a/libfuse/lib/cuse_lowlevel.c b/libfuse/lib/cuse_lowlevel.c index 402cf4bd1..b3a535f1c 100644 --- a/libfuse/lib/cuse_lowlevel.c +++ b/libfuse/lib/cuse_lowlevel.c @@ -359,7 +359,7 @@ int cuse_lowlevel_main(int argc, char *argv[], const struct cuse_info *ci, return 1; if (multithreaded) - res = fuse_session_loop_mt(se); + res = fuse_session_loop_mt(se, 0); else res = fuse_session_loop(se); diff --git a/libfuse/lib/fuse.c b/libfuse/lib/fuse.c index fa0a8149b..1ee0ecb7f 100644 --- a/libfuse/lib/fuse.c +++ b/libfuse/lib/fuse.c @@ -77,6 +77,7 @@ struct fuse_config { int intr_signal; int help; char *modules; + int threads; }; struct fuse_fs { @@ -4405,6 +4406,7 @@ static const struct fuse_opt fuse_lib_opts[] = { FUSE_LIB_OPT("intr", intr, 1), FUSE_LIB_OPT("intr_signal=%d", intr_signal, 0), FUSE_LIB_OPT("modules=%s", modules, 0), + FUSE_LIB_OPT("threads=%d", threads, 0), FUSE_OPT_END }; @@ -4900,3 +4902,8 @@ struct fuse *fuse_new_compat25(int fd, struct fuse_args *args, } FUSE_SYMVER(".symver fuse_new_compat25,fuse_new@FUSE_2.5"); + +int fuse_config_num_threads(const struct fuse *f) +{ + return f->conf.threads; +} diff --git a/libfuse/lib/fuse_loop_mt.c b/libfuse/lib/fuse_loop_mt.c index ad56ccae5..ce29b490b 100644 --- a/libfuse/lib/fuse_loop_mt.c +++ b/libfuse/lib/fuse_loop_mt.c @@ -19,6 +19,7 @@ #include #include #include +#include /* Environment var controlling the thread stack size */ #define ENVNAME_THREAD_STACK "FUSE_THREAD_STACK" @@ -167,9 +168,21 @@ static void fuse_join_worker(struct fuse_worker *w) free(w); } -int fuse_session_loop_mt(struct fuse_session *se) +static int number_of_threads(void) +{ +#ifdef _SC_NPROCESSORS_ONLN + return sysconf(_SC_NPROCESSORS_ONLN); +#endif + + return 4; +} + +int fuse_session_loop_mt(struct fuse_session *se, + const int _threads) { + int i; int err; + int threads; struct fuse_mt mt; struct fuse_worker *w; @@ -181,8 +194,14 @@ int fuse_session_loop_mt(struct fuse_session *se) mt.main.prev = mt.main.next = &mt.main; sem_init(&mt.finish, 0, 0); + threads = ((_threads > 0) ? _threads : number_of_threads()); + if(_threads < 0) + threads /= -_threads; + if(threads == 0) + threads = 1; + err = 0; - for(size_t i = 0; (i < 10) && !err; i++) + for(i = 0; (i < threads) && !err; i++) err = fuse_loop_start_thread(&mt); if (!err) { diff --git a/libfuse/lib/fuse_mt.c b/libfuse/lib/fuse_mt.c index f6dbe71b2..6cc30344a 100644 --- a/libfuse/lib/fuse_mt.c +++ b/libfuse/lib/fuse_mt.c @@ -100,7 +100,8 @@ int fuse_loop_mt_proc(struct fuse *f, fuse_processor_t proc, void *data) return -1; } fuse_session_add_chan(se, ch); - res = fuse_session_loop_mt(se); + res = fuse_session_loop_mt(se, + fuse_config_num_threads(f)); fuse_session_destroy(se); return res; } @@ -114,7 +115,8 @@ int fuse_loop_mt(struct fuse *f) if (res) return -1; - res = fuse_session_loop_mt(fuse_get_session(f)); + res = fuse_session_loop_mt(fuse_get_session(f), + fuse_config_num_threads(f)); fuse_stop_cleanup_thread(f); return res; } diff --git a/man/mergerfs.1 b/man/mergerfs.1 index bdf62cfb7..a6d814ad6 100644 --- a/man/mergerfs.1 +++ b/man/mergerfs.1 @@ -103,6 +103,18 @@ Enabling this will cause rename and link to always use the non\-path preserving behavior. This means files, when renamed or linked, will stay on the same drive. .IP \[bu] 2 +\f[B]threads\f[]: number of threads to use in multithreaded mode. +When set to zero (the default) it will attempt to discover and use the +number of logical cores. +If the lookup fails it will fall back to using 4. +If the thread count is set negative it will look up the number of cores +then divide by the absolute value. +ie. +threads=\-2 on an 8 core machine will result in 8 / 2 = 4 threads. +There will always be at least 1 thread. +NOTE: higher number of threads increases parallelism but usually +decreases throughput. +.IP \[bu] 2 \f[B]fsname\f[]: sets the name of the filesystem as seen in \f[B]mount\f[], \f[B]df\f[], etc. Defaults to a list of the source paths concatenated together with the diff --git a/src/option_parser.cpp b/src/option_parser.cpp index f721f6809..01e13f4a1 100644 --- a/src/option_parser.cpp +++ b/src/option_parser.cpp @@ -298,7 +298,10 @@ usage(void) " -o nullrw= Disables reads and writes. For benchmarking.\n" " -o ignorepponrename=\n" " Ignore path preserving when performing renames\n" - " and links. default = false" + " and links. default = false\n" + " -o threads= number of worker threads. 0 = autodetect.\n" + " Negative values autodetect then divide by\n" + " absolute value. default = 0\n" << std::endl; }