Skip to content

Commit

Permalink
Merge pull request #424 from trapexit/threading
Browse files Browse the repository at this point in the history
Threading
  • Loading branch information
trapexit authored Jul 3, 2017
2 parents deb70c7 + 75ed37a commit c35a4da
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 70 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ mergerfs -o<options> <srcmounts> <mountpoint>
* **symlinkify_timeout**: time to wait, in seconds, to activate the **symlinkify** behavior. (default: 3600)
* **nullrw**: turns reads and writes into no-ops. The request will succeed but do nothing. Useful for benchmarking mergerfs. (default: false)
* **ignorepponrename**: ignore path preserving on rename. Typically rename and link act differently depending on the policy of `create` (read below). Enabling this will cause rename and link to always use the non-path preserving behavior. This means files, when renamed or linked, will stay on the same drive.
* **threads**: number of threads to use in multithreaded mode. When set to zero (the default) it will attempt to discover and use the number of logical cores. If the lookup fails it will fall back to using 4. If the thread count is set negative it will look up the number of cores then divide by the absolute value. ie. threads=-2 on an 8 core machine will result in 8 / 2 = 4 threads. There will always be at least 1 thread. NOTE: higher number of threads increases parallelism but usually decreases throughput.
* **fsname**: sets the name of the filesystem as seen in **mount**, **df**, etc. Defaults to a list of the source paths concatenated together with the longest common prefix removed.
* **func.<func>=<policy>**: sets the specific FUSE function's policy. See below for the list of value types. Example: **func.getattr=newest**
* **category.<category>=<policy>**: Sets policy of all FUSE functions in the provided category. Example: **category.create=mfs**
Expand Down
4 changes: 3 additions & 1 deletion libfuse/include/fuse.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ struct fuse_operations {
*/
int (*mknod) (const char *, mode_t, dev_t);

/** Create a directory
/** Create a directory
*
* Note that the mode argument may not have the type specification
* bits set, i.e. S_ISDIR(mode) can be false. To obtain the
Expand Down Expand Up @@ -696,6 +696,8 @@ int fuse_loop(struct fuse *f);
*/
void fuse_exit(struct fuse *f);

int fuse_config_num_threads(const struct fuse *f);

/**
* FUSE event loop with multiple threads
*
Expand Down
2 changes: 1 addition & 1 deletion libfuse/include/fuse_lowlevel.h
Original file line number Diff line number Diff line change
Expand Up @@ -1700,7 +1700,7 @@ int fuse_session_loop(struct fuse_session *se);
* @param se the session
* @return 0 on success, -1 on error
*/
int fuse_session_loop_mt(struct fuse_session *se);
int fuse_session_loop_mt(struct fuse_session *se, const int threads);

/* ----------------------------------------------------------- *
* Channel interface *
Expand Down
2 changes: 1 addition & 1 deletion libfuse/lib/cuse_lowlevel.c
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ int cuse_lowlevel_main(int argc, char *argv[], const struct cuse_info *ci,
return 1;

if (multithreaded)
res = fuse_session_loop_mt(se);
res = fuse_session_loop_mt(se, 0);
else
res = fuse_session_loop(se);

Expand Down
7 changes: 7 additions & 0 deletions libfuse/lib/fuse.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ struct fuse_config {
int intr_signal;
int help;
char *modules;
int threads;
};

struct fuse_fs {
Expand Down Expand Up @@ -4405,6 +4406,7 @@ static const struct fuse_opt fuse_lib_opts[] = {
FUSE_LIB_OPT("intr", intr, 1),
FUSE_LIB_OPT("intr_signal=%d", intr_signal, 0),
FUSE_LIB_OPT("modules=%s", modules, 0),
FUSE_LIB_OPT("threads=%d", threads, 0),
FUSE_OPT_END
};

Expand Down Expand Up @@ -4900,3 +4902,8 @@ struct fuse *fuse_new_compat25(int fd, struct fuse_args *args,
}

FUSE_SYMVER(".symver fuse_new_compat25,fuse_new@FUSE_2.5");

int fuse_config_num_threads(const struct fuse *f)
{
return f->conf.threads;
}
93 changes: 29 additions & 64 deletions libfuse/lib/fuse_loop_mt.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <semaphore.h>
#include <errno.h>
#include <sys/time.h>
#include <unistd.h>

/* Environment var controlling the thread stack size */
#define ENVNAME_THREAD_STACK "FUSE_THREAD_STACK"
Expand All @@ -33,9 +34,6 @@ struct fuse_worker {
};

struct fuse_mt {
pthread_mutex_t lock;
int numworker;
int numavail;
struct fuse_session *se;
struct fuse_chan *prevch;
struct fuse_worker main;
Expand Down Expand Up @@ -65,11 +63,10 @@ static int fuse_loop_start_thread(struct fuse_mt *mt);

static void *fuse_do_work(void *data)
{
struct fuse_worker *w = (struct fuse_worker *) data;
struct fuse_mt *mt = w->mt;
struct fuse_worker *w = (struct fuse_worker *) data;
struct fuse_mt *mt = w->mt;

while (!fuse_session_exited(mt->se)) {
int isforget = 0;
struct fuse_chan *ch = mt->prevch;
struct fuse_buf fbuf = {
.mem = w->buf,
Expand All @@ -90,51 +87,10 @@ static void *fuse_do_work(void *data)
break;
}

pthread_mutex_lock(&mt->lock);
if (mt->exit) {
pthread_mutex_unlock(&mt->lock);
if (mt->exit)
return NULL;
}

/*
* This disgusting hack is needed so that zillions of threads
* are not created on a burst of FORGET messages
*/
if (!(fbuf.flags & FUSE_BUF_IS_FD)) {
struct fuse_in_header *in = fbuf.mem;

if (in->opcode == FUSE_FORGET ||
in->opcode == FUSE_BATCH_FORGET)
isforget = 1;
}

if (!isforget)
mt->numavail--;
if (mt->numavail == 0)
fuse_loop_start_thread(mt);
pthread_mutex_unlock(&mt->lock);

fuse_session_process_buf(mt->se, &fbuf, ch);

pthread_mutex_lock(&mt->lock);
if (!isforget)
mt->numavail++;
if (mt->numavail > 10) {
if (mt->exit) {
pthread_mutex_unlock(&mt->lock);
return NULL;
}
list_del_worker(w);
mt->numavail--;
mt->numworker--;
pthread_mutex_unlock(&mt->lock);

pthread_detach(w->thread_id);
free(w->buf);
free(w);
return NULL;
}
pthread_mutex_unlock(&mt->lock);
}

sem_post(&mt->finish);
Expand Down Expand Up @@ -200,60 +156,69 @@ static int fuse_loop_start_thread(struct fuse_mt *mt)
return -1;
}
list_add_worker(w, &mt->main);
mt->numavail ++;
mt->numworker ++;

return 0;
}

static void fuse_join_worker(struct fuse_mt *mt, struct fuse_worker *w)
static void fuse_join_worker(struct fuse_worker *w)
{
pthread_join(w->thread_id, NULL);
pthread_mutex_lock(&mt->lock);
list_del_worker(w);
pthread_mutex_unlock(&mt->lock);
free(w->buf);
free(w);
}

int fuse_session_loop_mt(struct fuse_session *se)
static int number_of_threads(void)
{
#ifdef _SC_NPROCESSORS_ONLN
return sysconf(_SC_NPROCESSORS_ONLN);
#endif

return 4;
}

int fuse_session_loop_mt(struct fuse_session *se,
const int _threads)
{
int i;
int err;
int threads;
struct fuse_mt mt;
struct fuse_worker *w;

memset(&mt, 0, sizeof(struct fuse_mt));
mt.se = se;
mt.prevch = fuse_session_next_chan(se, NULL);
mt.error = 0;
mt.numworker = 0;
mt.numavail = 0;
mt.main.thread_id = pthread_self();
mt.main.prev = mt.main.next = &mt.main;
sem_init(&mt.finish, 0, 0);
fuse_mutex_init(&mt.lock);

pthread_mutex_lock(&mt.lock);
err = fuse_loop_start_thread(&mt);
pthread_mutex_unlock(&mt.lock);
threads = ((_threads > 0) ? _threads : number_of_threads());
if(_threads < 0)
threads /= -_threads;
if(threads == 0)
threads = 1;

err = 0;
for(i = 0; (i < threads) && !err; i++)
err = fuse_loop_start_thread(&mt);

if (!err) {
/* sem_wait() is interruptible */
while (!fuse_session_exited(se))
sem_wait(&mt.finish);

pthread_mutex_lock(&mt.lock);
for (w = mt.main.next; w != &mt.main; w = w->next)
pthread_cancel(w->thread_id);
mt.exit = 1;
pthread_mutex_unlock(&mt.lock);

while (mt.main.next != &mt.main)
fuse_join_worker(&mt, mt.main.next);
fuse_join_worker(mt.main.next);

err = mt.error;
}

pthread_mutex_destroy(&mt.lock);
sem_destroy(&mt.finish);
fuse_session_reset(se);
return err;
Expand Down
6 changes: 4 additions & 2 deletions libfuse/lib/fuse_mt.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ int fuse_loop_mt_proc(struct fuse *f, fuse_processor_t proc, void *data)
return -1;
}
fuse_session_add_chan(se, ch);
res = fuse_session_loop_mt(se);
res = fuse_session_loop_mt(se,
fuse_config_num_threads(f));
fuse_session_destroy(se);
return res;
}
Expand All @@ -114,7 +115,8 @@ int fuse_loop_mt(struct fuse *f)
if (res)
return -1;

res = fuse_session_loop_mt(fuse_get_session(f));
res = fuse_session_loop_mt(fuse_get_session(f),
fuse_config_num_threads(f));
fuse_stop_cleanup_thread(f);
return res;
}
Expand Down
12 changes: 12 additions & 0 deletions man/mergerfs.1
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ Enabling this will cause rename and link to always use the non\-path
preserving behavior.
This means files, when renamed or linked, will stay on the same drive.
.IP \[bu] 2
\f[B]threads\f[]: number of threads to use in multithreaded mode.
When set to zero (the default) it will attempt to discover and use the
number of logical cores.
If the lookup fails it will fall back to using 4.
If the thread count is set negative it will look up the number of cores
then divide by the absolute value.
ie.
threads=\-2 on an 8 core machine will result in 8 / 2 = 4 threads.
There will always be at least 1 thread.
NOTE: higher number of threads increases parallelism but usually
decreases throughput.
.IP \[bu] 2
\f[B]fsname\f[]: sets the name of the filesystem as seen in
\f[B]mount\f[], \f[B]df\f[], etc.
Defaults to a list of the source paths concatenated together with the
Expand Down
5 changes: 4 additions & 1 deletion src/option_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,10 @@ usage(void)
" -o nullrw=<bool> Disables reads and writes. For benchmarking.\n"
" -o ignorepponrename=<bool>\n"
" Ignore path preserving when performing renames\n"
" and links. default = false"
" and links. default = false\n"
" -o threads=<int> number of worker threads. 0 = autodetect.\n"
" Negative values autodetect then divide by\n"
" absolute value. default = 0\n"
<< std::endl;
}

Expand Down

0 comments on commit c35a4da

Please sign in to comment.