From 10817bf09d5f8cb22711fb0ee8d8da49f6f05f89 Mon Sep 17 00:00:00 2001 From: "Daniel P. Berrange" Date: Tue, 1 Sep 2015 14:48:02 +0100 Subject: coroutine: move into libqemuutil.a library The coroutine files are currently referenced by the block-obj-y variable. The coroutine functionality though is already used by more than just the block code. eg migration code uses coroutine yield. In the future the I/O channel code will also use the coroutine yield functionality. Since the coroutine code is nicely self-contained it can be easily built as part of the libqemuutil.a library, making it widely available. The headers are also moved into include/qemu, instead of the include/block directory, since they are now part of the util codebase, and the impl was never in the block/ directory either. Signed-off-by: Daniel P. Berrange --- util/Makefile.objs | 3 + util/coroutine-gthread.c | 198 +++++++++++++++++++++++++++++ util/coroutine-sigaltstack.c | 293 +++++++++++++++++++++++++++++++++++++++++++ util/coroutine-ucontext.c | 194 ++++++++++++++++++++++++++++ util/coroutine-win32.c | 101 +++++++++++++++ util/qemu-coroutine-io.c | 91 ++++++++++++++ util/qemu-coroutine-lock.c | 186 +++++++++++++++++++++++++++ util/qemu-coroutine-sleep.c | 41 ++++++ util/qemu-coroutine.c | 146 +++++++++++++++++++++ 9 files changed, 1253 insertions(+) create mode 100644 util/coroutine-gthread.c create mode 100644 util/coroutine-sigaltstack.c create mode 100644 util/coroutine-ucontext.c create mode 100644 util/coroutine-win32.c create mode 100644 util/qemu-coroutine-io.c create mode 100644 util/qemu-coroutine-lock.c create mode 100644 util/qemu-coroutine-sleep.c create mode 100644 util/qemu-coroutine.c (limited to 'util') diff --git a/util/Makefile.objs b/util/Makefile.objs index 114d657..d8d7e7a 100644 --- a/util/Makefile.objs +++ b/util/Makefile.objs @@ -18,3 +18,6 @@ util-obj-y += getauxval.o util-obj-y += readline.o util-obj-y += rfifolock.o util-obj-y += rcu.o +util-obj-y += qemu-coroutine.o qemu-coroutine-lock.o qemu-coroutine-io.o +util-obj-y += qemu-coroutine-sleep.o +util-obj-y += coroutine-$(CONFIG_COROUTINE_BACKEND).o diff --git a/util/coroutine-gthread.c b/util/coroutine-gthread.c new file mode 100644 index 0000000..0bcd778 --- /dev/null +++ b/util/coroutine-gthread.c @@ -0,0 +1,198 @@ +/* + * GThread coroutine initialization code + * + * Copyright (C) 2006 Anthony Liguori + * Copyright (C) 2011 Aneesh Kumar K.V + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.0 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see . + */ + +#include +#include "qemu-common.h" +#include "qemu/coroutine_int.h" + +typedef struct { + Coroutine base; + GThread *thread; + bool runnable; + bool free_on_thread_exit; + CoroutineAction action; +} CoroutineGThread; + +static CompatGMutex coroutine_lock; +static CompatGCond coroutine_cond; + +/* GLib 2.31 and beyond deprecated various parts of the thread API, + * but the new interfaces are not available in older GLib versions + * so we have to cope with both. + */ +#if GLIB_CHECK_VERSION(2, 31, 0) +/* Awkwardly, the GPrivate API doesn't provide a way to update the + * GDestroyNotify handler for the coroutine key dynamically. So instead + * we track whether or not the CoroutineGThread should be freed on + * thread exit / coroutine key update using the free_on_thread_exit + * field. + */ +static void coroutine_destroy_notify(gpointer data) +{ + CoroutineGThread *co = data; + if (co && co->free_on_thread_exit) { + g_free(co); + } +} + +static GPrivate coroutine_key = G_PRIVATE_INIT(coroutine_destroy_notify); + +static inline CoroutineGThread *get_coroutine_key(void) +{ + return g_private_get(&coroutine_key); +} + +static inline void set_coroutine_key(CoroutineGThread *co, + bool free_on_thread_exit) +{ + /* Unlike g_static_private_set() this does not call the GDestroyNotify + * if the previous value of the key was NULL. Fortunately we only need + * the GDestroyNotify in the non-NULL key case. + */ + co->free_on_thread_exit = free_on_thread_exit; + g_private_replace(&coroutine_key, co); +} + +static inline GThread *create_thread(GThreadFunc func, gpointer data) +{ + return g_thread_new("coroutine", func, data); +} + +#else + +/* Handle older GLib versions */ + +static GStaticPrivate coroutine_key = G_STATIC_PRIVATE_INIT; + +static inline CoroutineGThread *get_coroutine_key(void) +{ + return g_static_private_get(&coroutine_key); +} + +static inline void set_coroutine_key(CoroutineGThread *co, + bool free_on_thread_exit) +{ + g_static_private_set(&coroutine_key, co, + free_on_thread_exit ? (GDestroyNotify)g_free : NULL); +} + +static inline GThread *create_thread(GThreadFunc func, gpointer data) +{ + return g_thread_create_full(func, data, 0, TRUE, TRUE, + G_THREAD_PRIORITY_NORMAL, NULL); +} + +#endif + + +static void __attribute__((constructor)) coroutine_init(void) +{ +#if !GLIB_CHECK_VERSION(2, 31, 0) + if (!g_thread_supported()) { + g_thread_init(NULL); + } +#endif +} + +static void coroutine_wait_runnable_locked(CoroutineGThread *co) +{ + while (!co->runnable) { + g_cond_wait(&coroutine_cond, &coroutine_lock); + } +} + +static void coroutine_wait_runnable(CoroutineGThread *co) +{ + g_mutex_lock(&coroutine_lock); + coroutine_wait_runnable_locked(co); + g_mutex_unlock(&coroutine_lock); +} + +static gpointer coroutine_thread(gpointer opaque) +{ + CoroutineGThread *co = opaque; + + set_coroutine_key(co, false); + coroutine_wait_runnable(co); + co->base.entry(co->base.entry_arg); + qemu_coroutine_switch(&co->base, co->base.caller, COROUTINE_TERMINATE); + return NULL; +} + +Coroutine *qemu_coroutine_new(void) +{ + CoroutineGThread *co; + + co = g_malloc0(sizeof(*co)); + co->thread = create_thread(coroutine_thread, co); + if (!co->thread) { + g_free(co); + return NULL; + } + return &co->base; +} + +void qemu_coroutine_delete(Coroutine *co_) +{ + CoroutineGThread *co = DO_UPCAST(CoroutineGThread, base, co_); + + g_thread_join(co->thread); + g_free(co); +} + +CoroutineAction qemu_coroutine_switch(Coroutine *from_, + Coroutine *to_, + CoroutineAction action) +{ + CoroutineGThread *from = DO_UPCAST(CoroutineGThread, base, from_); + CoroutineGThread *to = DO_UPCAST(CoroutineGThread, base, to_); + + g_mutex_lock(&coroutine_lock); + from->runnable = false; + from->action = action; + to->runnable = true; + to->action = action; + g_cond_broadcast(&coroutine_cond); + + if (action != COROUTINE_TERMINATE) { + coroutine_wait_runnable_locked(from); + } + g_mutex_unlock(&coroutine_lock); + return from->action; +} + +Coroutine *qemu_coroutine_self(void) +{ + CoroutineGThread *co = get_coroutine_key(); + if (!co) { + co = g_malloc0(sizeof(*co)); + co->runnable = true; + set_coroutine_key(co, true); + } + + return &co->base; +} + +bool qemu_in_coroutine(void) +{ + CoroutineGThread *co = get_coroutine_key(); + + return co && co->base.caller; +} diff --git a/util/coroutine-sigaltstack.c b/util/coroutine-sigaltstack.c new file mode 100644 index 0000000..39842a4 --- /dev/null +++ b/util/coroutine-sigaltstack.c @@ -0,0 +1,293 @@ +/* + * sigaltstack coroutine initialization code + * + * Copyright (C) 2006 Anthony Liguori + * Copyright (C) 2011 Kevin Wolf + * Copyright (C) 2012 Alex Barcelo +** This file is partly based on pth_mctx.c, from the GNU Portable Threads +** Copyright (c) 1999-2006 Ralf S. Engelschall + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see . + */ + +/* XXX Is there a nicer way to disable glibc's stack check for longjmp? */ +#ifdef _FORTIFY_SOURCE +#undef _FORTIFY_SOURCE +#endif +#include +#include +#include +#include +#include +#include "qemu-common.h" +#include "qemu/coroutine_int.h" + +typedef struct { + Coroutine base; + void *stack; + sigjmp_buf env; +} CoroutineUContext; + +/** + * Per-thread coroutine bookkeeping + */ +typedef struct { + /** Currently executing coroutine */ + Coroutine *current; + + /** The default coroutine */ + CoroutineUContext leader; + + /** Information for the signal handler (trampoline) */ + sigjmp_buf tr_reenter; + volatile sig_atomic_t tr_called; + void *tr_handler; +} CoroutineThreadState; + +static pthread_key_t thread_state_key; + +static CoroutineThreadState *coroutine_get_thread_state(void) +{ + CoroutineThreadState *s = pthread_getspecific(thread_state_key); + + if (!s) { + s = g_malloc0(sizeof(*s)); + s->current = &s->leader.base; + pthread_setspecific(thread_state_key, s); + } + return s; +} + +static void qemu_coroutine_thread_cleanup(void *opaque) +{ + CoroutineThreadState *s = opaque; + + g_free(s); +} + +static void __attribute__((constructor)) coroutine_init(void) +{ + int ret; + + ret = pthread_key_create(&thread_state_key, qemu_coroutine_thread_cleanup); + if (ret != 0) { + fprintf(stderr, "unable to create leader key: %s\n", strerror(errno)); + abort(); + } +} + +/* "boot" function + * This is what starts the coroutine, is called from the trampoline + * (from the signal handler when it is not signal handling, read ahead + * for more information). + */ +static void coroutine_bootstrap(CoroutineUContext *self, Coroutine *co) +{ + /* Initialize longjmp environment and switch back the caller */ + if (!sigsetjmp(self->env, 0)) { + siglongjmp(*(sigjmp_buf *)co->entry_arg, 1); + } + + while (true) { + co->entry(co->entry_arg); + qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE); + } +} + +/* + * This is used as the signal handler. This is called with the brand new stack + * (thanks to sigaltstack). We have to return, given that this is a signal + * handler and the sigmask and some other things are changed. + */ +static void coroutine_trampoline(int signal) +{ + CoroutineUContext *self; + Coroutine *co; + CoroutineThreadState *coTS; + + /* Get the thread specific information */ + coTS = coroutine_get_thread_state(); + self = coTS->tr_handler; + coTS->tr_called = 1; + co = &self->base; + + /* + * Here we have to do a bit of a ping pong between the caller, given that + * this is a signal handler and we have to do a return "soon". Then the + * caller can reestablish everything and do a siglongjmp here again. + */ + if (!sigsetjmp(coTS->tr_reenter, 0)) { + return; + } + + /* + * Ok, the caller has siglongjmp'ed back to us, so now prepare + * us for the real machine state switching. We have to jump + * into another function here to get a new stack context for + * the auto variables (which have to be auto-variables + * because the start of the thread happens later). Else with + * PIC (i.e. Position Independent Code which is used when PTH + * is built as a shared library) most platforms would + * horrible core dump as experience showed. + */ + coroutine_bootstrap(self, co); +} + +Coroutine *qemu_coroutine_new(void) +{ + const size_t stack_size = 1 << 20; + CoroutineUContext *co; + CoroutineThreadState *coTS; + struct sigaction sa; + struct sigaction osa; + stack_t ss; + stack_t oss; + sigset_t sigs; + sigset_t osigs; + sigjmp_buf old_env; + + /* The way to manipulate stack is with the sigaltstack function. We + * prepare a stack, with it delivering a signal to ourselves and then + * put sigsetjmp/siglongjmp where needed. + * This has been done keeping coroutine-ucontext as a model and with the + * pth ideas (GNU Portable Threads). See coroutine-ucontext for the basics + * of the coroutines and see pth_mctx.c (from the pth project) for the + * sigaltstack way of manipulating stacks. + */ + + co = g_malloc0(sizeof(*co)); + co->stack = g_malloc(stack_size); + co->base.entry_arg = &old_env; /* stash away our jmp_buf */ + + coTS = coroutine_get_thread_state(); + coTS->tr_handler = co; + + /* + * Preserve the SIGUSR2 signal state, block SIGUSR2, + * and establish our signal handler. The signal will + * later transfer control onto the signal stack. + */ + sigemptyset(&sigs); + sigaddset(&sigs, SIGUSR2); + pthread_sigmask(SIG_BLOCK, &sigs, &osigs); + sa.sa_handler = coroutine_trampoline; + sigfillset(&sa.sa_mask); + sa.sa_flags = SA_ONSTACK; + if (sigaction(SIGUSR2, &sa, &osa) != 0) { + abort(); + } + + /* + * Set the new stack. + */ + ss.ss_sp = co->stack; + ss.ss_size = stack_size; + ss.ss_flags = 0; + if (sigaltstack(&ss, &oss) < 0) { + abort(); + } + + /* + * Now transfer control onto the signal stack and set it up. + * It will return immediately via "return" after the sigsetjmp() + * was performed. Be careful here with race conditions. The + * signal can be delivered the first time sigsuspend() is + * called. + */ + coTS->tr_called = 0; + pthread_kill(pthread_self(), SIGUSR2); + sigfillset(&sigs); + sigdelset(&sigs, SIGUSR2); + while (!coTS->tr_called) { + sigsuspend(&sigs); + } + + /* + * Inform the system that we are back off the signal stack by + * removing the alternative signal stack. Be careful here: It + * first has to be disabled, before it can be removed. + */ + sigaltstack(NULL, &ss); + ss.ss_flags = SS_DISABLE; + if (sigaltstack(&ss, NULL) < 0) { + abort(); + } + sigaltstack(NULL, &ss); + if (!(oss.ss_flags & SS_DISABLE)) { + sigaltstack(&oss, NULL); + } + + /* + * Restore the old SIGUSR2 signal handler and mask + */ + sigaction(SIGUSR2, &osa, NULL); + pthread_sigmask(SIG_SETMASK, &osigs, NULL); + + /* + * Now enter the trampoline again, but this time not as a signal + * handler. Instead we jump into it directly. The functionally + * redundant ping-pong pointer arithmetic is necessary to avoid + * type-conversion warnings related to the `volatile' qualifier and + * the fact that `jmp_buf' usually is an array type. + */ + if (!sigsetjmp(old_env, 0)) { + siglongjmp(coTS->tr_reenter, 1); + } + + /* + * Ok, we returned again, so now we're finished + */ + + return &co->base; +} + +void qemu_coroutine_delete(Coroutine *co_) +{ + CoroutineUContext *co = DO_UPCAST(CoroutineUContext, base, co_); + + g_free(co->stack); + g_free(co); +} + +CoroutineAction qemu_coroutine_switch(Coroutine *from_, Coroutine *to_, + CoroutineAction action) +{ + CoroutineUContext *from = DO_UPCAST(CoroutineUContext, base, from_); + CoroutineUContext *to = DO_UPCAST(CoroutineUContext, base, to_); + CoroutineThreadState *s = coroutine_get_thread_state(); + int ret; + + s->current = to_; + + ret = sigsetjmp(from->env, 0); + if (ret == 0) { + siglongjmp(to->env, action); + } + return ret; +} + +Coroutine *qemu_coroutine_self(void) +{ + CoroutineThreadState *s = coroutine_get_thread_state(); + + return s->current; +} + +bool qemu_in_coroutine(void) +{ + CoroutineThreadState *s = pthread_getspecific(thread_state_key); + + return s && s->current->caller; +} + diff --git a/util/coroutine-ucontext.c b/util/coroutine-ucontext.c new file mode 100644 index 0000000..26cbebb --- /dev/null +++ b/util/coroutine-ucontext.c @@ -0,0 +1,194 @@ +/* + * ucontext coroutine initialization code + * + * Copyright (C) 2006 Anthony Liguori + * Copyright (C) 2011 Kevin Wolf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.0 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see . + */ + +/* XXX Is there a nicer way to disable glibc's stack check for longjmp? */ +#ifdef _FORTIFY_SOURCE +#undef _FORTIFY_SOURCE +#endif +#include +#include +#include +#include +#include "qemu-common.h" +#include "qemu/coroutine_int.h" + +#ifdef CONFIG_VALGRIND_H +#include +#endif + +typedef struct { + Coroutine base; + void *stack; + sigjmp_buf env; + +#ifdef CONFIG_VALGRIND_H + unsigned int valgrind_stack_id; +#endif + +} CoroutineUContext; + +/** + * Per-thread coroutine bookkeeping + */ +static __thread CoroutineUContext leader; +static __thread Coroutine *current; + +/* + * va_args to makecontext() must be type 'int', so passing + * the pointer we need may require several int args. This + * union is a quick hack to let us do that + */ +union cc_arg { + void *p; + int i[2]; +}; + +static void coroutine_trampoline(int i0, int i1) +{ + union cc_arg arg; + CoroutineUContext *self; + Coroutine *co; + + arg.i[0] = i0; + arg.i[1] = i1; + self = arg.p; + co = &self->base; + + /* Initialize longjmp environment and switch back the caller */ + if (!sigsetjmp(self->env, 0)) { + siglongjmp(*(sigjmp_buf *)co->entry_arg, 1); + } + + while (true) { + co->entry(co->entry_arg); + qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE); + } +} + +Coroutine *qemu_coroutine_new(void) +{ + const size_t stack_size = 1 << 20; + CoroutineUContext *co; + ucontext_t old_uc, uc; + sigjmp_buf old_env; + union cc_arg arg = {0}; + + /* The ucontext functions preserve signal masks which incurs a + * system call overhead. sigsetjmp(buf, 0)/siglongjmp() does not + * preserve signal masks but only works on the current stack. + * Since we need a way to create and switch to a new stack, use + * the ucontext functions for that but sigsetjmp()/siglongjmp() for + * everything else. + */ + + if (getcontext(&uc) == -1) { + abort(); + } + + co = g_malloc0(sizeof(*co)); + co->stack = g_malloc(stack_size); + co->base.entry_arg = &old_env; /* stash away our jmp_buf */ + + uc.uc_link = &old_uc; + uc.uc_stack.ss_sp = co->stack; + uc.uc_stack.ss_size = stack_size; + uc.uc_stack.ss_flags = 0; + +#ifdef CONFIG_VALGRIND_H + co->valgrind_stack_id = + VALGRIND_STACK_REGISTER(co->stack, co->stack + stack_size); +#endif + + arg.p = co; + + makecontext(&uc, (void (*)(void))coroutine_trampoline, + 2, arg.i[0], arg.i[1]); + + /* swapcontext() in, siglongjmp() back out */ + if (!sigsetjmp(old_env, 0)) { + swapcontext(&old_uc, &uc); + } + return &co->base; +} + +#ifdef CONFIG_VALGRIND_H +#ifdef CONFIG_PRAGMA_DIAGNOSTIC_AVAILABLE +/* Work around an unused variable in the valgrind.h macro... */ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-but-set-variable" +#endif +static inline void valgrind_stack_deregister(CoroutineUContext *co) +{ + VALGRIND_STACK_DEREGISTER(co->valgrind_stack_id); +} +#ifdef CONFIG_PRAGMA_DIAGNOSTIC_AVAILABLE +#pragma GCC diagnostic pop +#endif +#endif + +void qemu_coroutine_delete(Coroutine *co_) +{ + CoroutineUContext *co = DO_UPCAST(CoroutineUContext, base, co_); + +#ifdef CONFIG_VALGRIND_H + valgrind_stack_deregister(co); +#endif + + g_free(co->stack); + g_free(co); +} + +/* This function is marked noinline to prevent GCC from inlining it + * into coroutine_trampoline(). If we allow it to do that then it + * hoists the code to get the address of the TLS variable "current" + * out of the while() loop. This is an invalid transformation because + * the sigsetjmp() call may be called when running thread A but + * return in thread B, and so we might be in a different thread + * context each time round the loop. + */ +CoroutineAction __attribute__((noinline)) +qemu_coroutine_switch(Coroutine *from_, Coroutine *to_, + CoroutineAction action) +{ + CoroutineUContext *from = DO_UPCAST(CoroutineUContext, base, from_); + CoroutineUContext *to = DO_UPCAST(CoroutineUContext, base, to_); + int ret; + + current = to_; + + ret = sigsetjmp(from->env, 0); + if (ret == 0) { + siglongjmp(to->env, action); + } + return ret; +} + +Coroutine *qemu_coroutine_self(void) +{ + if (!current) { + current = &leader.base; + } + return current; +} + +bool qemu_in_coroutine(void) +{ + return current && current->caller; +} diff --git a/util/coroutine-win32.c b/util/coroutine-win32.c new file mode 100644 index 0000000..4f922c5 --- /dev/null +++ b/util/coroutine-win32.c @@ -0,0 +1,101 @@ +/* + * Win32 coroutine initialization code + * + * Copyright (c) 2011 Kevin Wolf + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include "qemu-common.h" +#include "qemu/coroutine_int.h" + +typedef struct +{ + Coroutine base; + + LPVOID fiber; + CoroutineAction action; +} CoroutineWin32; + +static __thread CoroutineWin32 leader; +static __thread Coroutine *current; + +/* This function is marked noinline to prevent GCC from inlining it + * into coroutine_trampoline(). If we allow it to do that then it + * hoists the code to get the address of the TLS variable "current" + * out of the while() loop. This is an invalid transformation because + * the SwitchToFiber() call may be called when running thread A but + * return in thread B, and so we might be in a different thread + * context each time round the loop. + */ +CoroutineAction __attribute__((noinline)) +qemu_coroutine_switch(Coroutine *from_, Coroutine *to_, + CoroutineAction action) +{ + CoroutineWin32 *from = DO_UPCAST(CoroutineWin32, base, from_); + CoroutineWin32 *to = DO_UPCAST(CoroutineWin32, base, to_); + + current = to_; + + to->action = action; + SwitchToFiber(to->fiber); + return from->action; +} + +static void CALLBACK coroutine_trampoline(void *co_) +{ + Coroutine *co = co_; + + while (true) { + co->entry(co->entry_arg); + qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE); + } +} + +Coroutine *qemu_coroutine_new(void) +{ + const size_t stack_size = 1 << 20; + CoroutineWin32 *co; + + co = g_malloc0(sizeof(*co)); + co->fiber = CreateFiber(stack_size, coroutine_trampoline, &co->base); + return &co->base; +} + +void qemu_coroutine_delete(Coroutine *co_) +{ + CoroutineWin32 *co = DO_UPCAST(CoroutineWin32, base, co_); + + DeleteFiber(co->fiber); + g_free(co); +} + +Coroutine *qemu_coroutine_self(void) +{ + if (!current) { + current = &leader.base; + leader.fiber = ConvertThreadToFiber(NULL); + } + return current; +} + +bool qemu_in_coroutine(void) +{ + return current && current->caller; +} diff --git a/util/qemu-coroutine-io.c b/util/qemu-coroutine-io.c new file mode 100644 index 0000000..e1eae73 --- /dev/null +++ b/util/qemu-coroutine-io.c @@ -0,0 +1,91 @@ +/* + * Coroutine-aware I/O functions + * + * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation. + * Copyright (c) 2011, Red Hat, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +#include "qemu-common.h" +#include "qemu/sockets.h" +#include "qemu/coroutine.h" +#include "qemu/iov.h" +#include "qemu/main-loop.h" + +ssize_t coroutine_fn +qemu_co_sendv_recvv(int sockfd, struct iovec *iov, unsigned iov_cnt, + size_t offset, size_t bytes, bool do_send) +{ + size_t done = 0; + ssize_t ret; + int err; + while (done < bytes) { + ret = iov_send_recv(sockfd, iov, iov_cnt, + offset + done, bytes - done, do_send); + if (ret > 0) { + done += ret; + } else if (ret < 0) { + err = socket_error(); + if (err == EAGAIN || err == EWOULDBLOCK) { + qemu_coroutine_yield(); + } else if (done == 0) { + return -err; + } else { + break; + } + } else if (ret == 0 && !do_send) { + /* write (send) should never return 0. + * read (recv) returns 0 for end-of-file (-data). + * In both cases there's little point retrying, + * but we do for write anyway, just in case */ + break; + } + } + return done; +} + +ssize_t coroutine_fn +qemu_co_send_recv(int sockfd, void *buf, size_t bytes, bool do_send) +{ + struct iovec iov = { .iov_base = buf, .iov_len = bytes }; + return qemu_co_sendv_recvv(sockfd, &iov, 1, 0, bytes, do_send); +} + +typedef struct { + Coroutine *co; + int fd; +} FDYieldUntilData; + +static void fd_coroutine_enter(void *opaque) +{ + FDYieldUntilData *data = opaque; + qemu_set_fd_handler(data->fd, NULL, NULL, NULL); + qemu_coroutine_enter(data->co, NULL); +} + +void coroutine_fn yield_until_fd_readable(int fd) +{ + FDYieldUntilData data; + + assert(qemu_in_coroutine()); + data.co = qemu_coroutine_self(); + data.fd = fd; + qemu_set_fd_handler(fd, fd_coroutine_enter, NULL, &data); + qemu_coroutine_yield(); +} diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c new file mode 100644 index 0000000..130ee19 --- /dev/null +++ b/util/qemu-coroutine-lock.c @@ -0,0 +1,186 @@ +/* + * coroutine queues and locks + * + * Copyright (c) 2011 Kevin Wolf + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include "qemu-common.h" +#include "qemu/coroutine.h" +#include "qemu/coroutine_int.h" +#include "qemu/queue.h" +#include "trace.h" + +void qemu_co_queue_init(CoQueue *queue) +{ + QTAILQ_INIT(&queue->entries); +} + +void coroutine_fn qemu_co_queue_wait(CoQueue *queue) +{ + Coroutine *self = qemu_coroutine_self(); + QTAILQ_INSERT_TAIL(&queue->entries, self, co_queue_next); + qemu_coroutine_yield(); + assert(qemu_in_coroutine()); +} + +/** + * qemu_co_queue_run_restart: + * + * Enter each coroutine that was previously marked for restart by + * qemu_co_queue_next() or qemu_co_queue_restart_all(). This function is + * invoked by the core coroutine code when the current coroutine yields or + * terminates. + */ +void qemu_co_queue_run_restart(Coroutine *co) +{ + Coroutine *next; + + trace_qemu_co_queue_run_restart(co); + while ((next = QTAILQ_FIRST(&co->co_queue_wakeup))) { + QTAILQ_REMOVE(&co->co_queue_wakeup, next, co_queue_next); + qemu_coroutine_enter(next, NULL); + } +} + +static bool qemu_co_queue_do_restart(CoQueue *queue, bool single) +{ + Coroutine *self = qemu_coroutine_self(); + Coroutine *next; + + if (QTAILQ_EMPTY(&queue->entries)) { + return false; + } + + while ((next = QTAILQ_FIRST(&queue->entries)) != NULL) { + QTAILQ_REMOVE(&queue->entries, next, co_queue_next); + QTAILQ_INSERT_TAIL(&self->co_queue_wakeup, next, co_queue_next); + trace_qemu_co_queue_next(next); + if (single) { + break; + } + } + return true; +} + +bool coroutine_fn qemu_co_queue_next(CoQueue *queue) +{ + assert(qemu_in_coroutine()); + return qemu_co_queue_do_restart(queue, true); +} + +void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue) +{ + assert(qemu_in_coroutine()); + qemu_co_queue_do_restart(queue, false); +} + +bool qemu_co_enter_next(CoQueue *queue) +{ + Coroutine *next; + + next = QTAILQ_FIRST(&queue->entries); + if (!next) { + return false; + } + + QTAILQ_REMOVE(&queue->entries, next, co_queue_next); + qemu_coroutine_enter(next, NULL); + return true; +} + +bool qemu_co_queue_empty(CoQueue *queue) +{ + return QTAILQ_FIRST(&queue->entries) == NULL; +} + +void qemu_co_mutex_init(CoMutex *mutex) +{ + memset(mutex, 0, sizeof(*mutex)); + qemu_co_queue_init(&mutex->queue); +} + +void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex) +{ + Coroutine *self = qemu_coroutine_self(); + + trace_qemu_co_mutex_lock_entry(mutex, self); + + while (mutex->locked) { + qemu_co_queue_wait(&mutex->queue); + } + + mutex->locked = true; + + trace_qemu_co_mutex_lock_return(mutex, self); +} + +void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex) +{ + Coroutine *self = qemu_coroutine_self(); + + trace_qemu_co_mutex_unlock_entry(mutex, self); + + assert(mutex->locked == true); + assert(qemu_in_coroutine()); + + mutex->locked = false; + qemu_co_queue_next(&mutex->queue); + + trace_qemu_co_mutex_unlock_return(mutex, self); +} + +void qemu_co_rwlock_init(CoRwlock *lock) +{ + memset(lock, 0, sizeof(*lock)); + qemu_co_queue_init(&lock->queue); +} + +void qemu_co_rwlock_rdlock(CoRwlock *lock) +{ + while (lock->writer) { + qemu_co_queue_wait(&lock->queue); + } + lock->reader++; +} + +void qemu_co_rwlock_unlock(CoRwlock *lock) +{ + assert(qemu_in_coroutine()); + if (lock->writer) { + lock->writer = false; + qemu_co_queue_restart_all(&lock->queue); + } else { + lock->reader--; + assert(lock->reader >= 0); + /* Wakeup only one waiting writer */ + if (!lock->reader) { + qemu_co_queue_next(&lock->queue); + } + } +} + +void qemu_co_rwlock_wrlock(CoRwlock *lock) +{ + while (lock->writer || lock->reader) { + qemu_co_queue_wait(&lock->queue); + } + lock->writer = true; +} diff --git a/util/qemu-coroutine-sleep.c b/util/qemu-coroutine-sleep.c new file mode 100644 index 0000000..b35db56 --- /dev/null +++ b/util/qemu-coroutine-sleep.c @@ -0,0 +1,41 @@ +/* + * QEMU coroutine sleep + * + * Copyright IBM, Corp. 2011 + * + * Authors: + * Stefan Hajnoczi + * + * This work is licensed under the terms of the GNU LGPL, version 2 or later. + * See the COPYING.LIB file in the top-level directory. + * + */ + +#include "qemu/coroutine.h" +#include "qemu/timer.h" +#include "block/aio.h" + +typedef struct CoSleepCB { + QEMUTimer *ts; + Coroutine *co; +} CoSleepCB; + +static void co_sleep_cb(void *opaque) +{ + CoSleepCB *sleep_cb = opaque; + + qemu_coroutine_enter(sleep_cb->co, NULL); +} + +void coroutine_fn co_aio_sleep_ns(AioContext *ctx, QEMUClockType type, + int64_t ns) +{ + CoSleepCB sleep_cb = { + .co = qemu_coroutine_self(), + }; + sleep_cb.ts = aio_timer_new(ctx, type, SCALE_NS, co_sleep_cb, &sleep_cb); + timer_mod(sleep_cb.ts, qemu_clock_get_ns(type) + ns); + qemu_coroutine_yield(); + timer_del(sleep_cb.ts); + timer_free(sleep_cb.ts); +} diff --git a/util/qemu-coroutine.c b/util/qemu-coroutine.c new file mode 100644 index 0000000..8953560 --- /dev/null +++ b/util/qemu-coroutine.c @@ -0,0 +1,146 @@ +/* + * QEMU coroutines + * + * Copyright IBM, Corp. 2011 + * + * Authors: + * Stefan Hajnoczi + * Kevin Wolf + * + * This work is licensed under the terms of the GNU LGPL, version 2 or later. + * See the COPYING.LIB file in the top-level directory. + * + */ + +#include "trace.h" +#include "qemu-common.h" +#include "qemu/thread.h" +#include "qemu/atomic.h" +#include "qemu/coroutine.h" +#include "qemu/coroutine_int.h" + +enum { + POOL_BATCH_SIZE = 64, +}; + +/** Free list to speed up creation */ +static QSLIST_HEAD(, Coroutine) release_pool = QSLIST_HEAD_INITIALIZER(pool); +static unsigned int release_pool_size; +static __thread QSLIST_HEAD(, Coroutine) alloc_pool = QSLIST_HEAD_INITIALIZER(pool); +static __thread unsigned int alloc_pool_size; +static __thread Notifier coroutine_pool_cleanup_notifier; + +static void coroutine_pool_cleanup(Notifier *n, void *value) +{ + Coroutine *co; + Coroutine *tmp; + + QSLIST_FOREACH_SAFE(co, &alloc_pool, pool_next, tmp) { + QSLIST_REMOVE_HEAD(&alloc_pool, pool_next); + qemu_coroutine_delete(co); + } +} + +Coroutine *qemu_coroutine_create(CoroutineEntry *entry) +{ + Coroutine *co = NULL; + + if (CONFIG_COROUTINE_POOL) { + co = QSLIST_FIRST(&alloc_pool); + if (!co) { + if (release_pool_size > POOL_BATCH_SIZE) { + /* Slow path; a good place to register the destructor, too. */ + if (!coroutine_pool_cleanup_notifier.notify) { + coroutine_pool_cleanup_notifier.notify = coroutine_pool_cleanup; + qemu_thread_atexit_add(&coroutine_pool_cleanup_notifier); + } + + /* This is not exact; there could be a little skew between + * release_pool_size and the actual size of release_pool. But + * it is just a heuristic, it does not need to be perfect. + */ + alloc_pool_size = atomic_xchg(&release_pool_size, 0); + QSLIST_MOVE_ATOMIC(&alloc_pool, &release_pool); + co = QSLIST_FIRST(&alloc_pool); + } + } + if (co) { + QSLIST_REMOVE_HEAD(&alloc_pool, pool_next); + alloc_pool_size--; + } + } + + if (!co) { + co = qemu_coroutine_new(); + } + + co->entry = entry; + QTAILQ_INIT(&co->co_queue_wakeup); + return co; +} + +static void coroutine_delete(Coroutine *co) +{ + co->caller = NULL; + + if (CONFIG_COROUTINE_POOL) { + if (release_pool_size < POOL_BATCH_SIZE * 2) { + QSLIST_INSERT_HEAD_ATOMIC(&release_pool, co, pool_next); + atomic_inc(&release_pool_size); + return; + } + if (alloc_pool_size < POOL_BATCH_SIZE) { + QSLIST_INSERT_HEAD(&alloc_pool, co, pool_next); + alloc_pool_size++; + return; + } + } + + qemu_coroutine_delete(co); +} + +void qemu_coroutine_enter(Coroutine *co, void *opaque) +{ + Coroutine *self = qemu_coroutine_self(); + CoroutineAction ret; + + trace_qemu_coroutine_enter(self, co, opaque); + + if (co->caller) { + fprintf(stderr, "Co-routine re-entered recursively\n"); + abort(); + } + + co->caller = self; + co->entry_arg = opaque; + ret = qemu_coroutine_switch(self, co, COROUTINE_ENTER); + + qemu_co_queue_run_restart(co); + + switch (ret) { + case COROUTINE_YIELD: + return; + case COROUTINE_TERMINATE: + trace_qemu_coroutine_terminate(co); + coroutine_delete(co); + return; + default: + abort(); + } +} + +void coroutine_fn qemu_coroutine_yield(void) +{ + Coroutine *self = qemu_coroutine_self(); + Coroutine *to = self->caller; + + trace_qemu_coroutine_yield(self, to); + + if (!to) { + fprintf(stderr, "Co-routine is yielding to no one\n"); + abort(); + } + + self->caller = NULL; + qemu_coroutine_switch(self, to, COROUTINE_YIELD); +} -- cgit v1.1