From 15693458c4bc0693fd63a50d60f35b628fcf4e29 Mon Sep 17 00:00:00 2001 From: "Steven Rostedt (Red Hat)" Date: Thu, 28 Feb 2013 19:59:17 -0500 Subject: tracing/ring-buffer: Move poll wake ups into ring buffer code Move the logic to wake up on ring buffer data into the ring buffer code itself. This simplifies the tracing code a lot and also has the added benefit that waiters on one of the instance buffers can be woken only when data is added to that instance instead of data added to any instance. Signed-off-by: Steven Rostedt --- kernel/trace/ring_buffer.c | 146 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) (limited to 'kernel/trace/ring_buffer.c') diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c index 7244acd..56b6ea3 100644 --- a/kernel/trace/ring_buffer.c +++ b/kernel/trace/ring_buffer.c @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -442,6 +443,12 @@ int ring_buffer_print_page_header(struct trace_seq *s) return ret; } +struct rb_irq_work { + struct irq_work work; + wait_queue_head_t waiters; + bool waiters_pending; +}; + /* * head_page == tail_page && head == tail then buffer is empty. */ @@ -476,6 +483,8 @@ struct ring_buffer_per_cpu { struct list_head new_pages; /* new pages to add */ struct work_struct update_pages_work; struct completion update_done; + + struct rb_irq_work irq_work; }; struct ring_buffer { @@ -495,6 +504,8 @@ struct ring_buffer { struct notifier_block cpu_notify; #endif u64 (*clock)(void); + + struct rb_irq_work irq_work; }; struct ring_buffer_iter { @@ -506,6 +517,118 @@ struct ring_buffer_iter { u64 read_stamp; }; +/* + * rb_wake_up_waiters - wake up tasks waiting for ring buffer input + * + * Schedules a delayed work to wake up any task that is blocked on the + * ring buffer waiters queue. + */ +static void rb_wake_up_waiters(struct irq_work *work) +{ + struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); + + wake_up_all(&rbwork->waiters); +} + +/** + * ring_buffer_wait - wait for input to the ring buffer + * @buffer: buffer to wait on + * @cpu: the cpu buffer to wait on + * + * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon + * as data is added to any of the @buffer's cpu buffers. Otherwise + * it will wait for data to be added to a specific cpu buffer. + */ +void ring_buffer_wait(struct ring_buffer *buffer, int cpu) +{ + struct ring_buffer_per_cpu *cpu_buffer; + DEFINE_WAIT(wait); + struct rb_irq_work *work; + + /* + * Depending on what the caller is waiting for, either any + * data in any cpu buffer, or a specific buffer, put the + * caller on the appropriate wait queue. + */ + if (cpu == RING_BUFFER_ALL_CPUS) + work = &buffer->irq_work; + else { + cpu_buffer = buffer->buffers[cpu]; + work = &cpu_buffer->irq_work; + } + + + prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE); + + /* + * The events can happen in critical sections where + * checking a work queue can cause deadlocks. + * After adding a task to the queue, this flag is set + * only to notify events to try to wake up the queue + * using irq_work. + * + * We don't clear it even if the buffer is no longer + * empty. The flag only causes the next event to run + * irq_work to do the work queue wake up. The worse + * that can happen if we race with !trace_empty() is that + * an event will cause an irq_work to try to wake up + * an empty queue. + * + * There's no reason to protect this flag either, as + * the work queue and irq_work logic will do the necessary + * synchronization for the wake ups. The only thing + * that is necessary is that the wake up happens after + * a task has been queued. It's OK for spurious wake ups. + */ + work->waiters_pending = true; + + if ((cpu == RING_BUFFER_ALL_CPUS && ring_buffer_empty(buffer)) || + (cpu != RING_BUFFER_ALL_CPUS && ring_buffer_empty_cpu(buffer, cpu))) + schedule(); + + finish_wait(&work->waiters, &wait); +} + +/** + * ring_buffer_poll_wait - poll on buffer input + * @buffer: buffer to wait on + * @cpu: the cpu buffer to wait on + * @filp: the file descriptor + * @poll_table: The poll descriptor + * + * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon + * as data is added to any of the @buffer's cpu buffers. Otherwise + * it will wait for data to be added to a specific cpu buffer. + * + * Returns POLLIN | POLLRDNORM if data exists in the buffers, + * zero otherwise. + */ +int ring_buffer_poll_wait(struct ring_buffer *buffer, int cpu, + struct file *filp, poll_table *poll_table) +{ + struct ring_buffer_per_cpu *cpu_buffer; + struct rb_irq_work *work; + + if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || + (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) + return POLLIN | POLLRDNORM; + + if (cpu == RING_BUFFER_ALL_CPUS) + work = &buffer->irq_work; + else { + cpu_buffer = buffer->buffers[cpu]; + work = &cpu_buffer->irq_work; + } + + work->waiters_pending = true; + poll_wait(filp, &work->waiters, poll_table); + + if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || + (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) + return POLLIN | POLLRDNORM; + return 0; +} + /* buffer may be either ring_buffer or ring_buffer_per_cpu */ #define RB_WARN_ON(b, cond) \ ({ \ @@ -1061,6 +1184,7 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int nr_pages, int cpu) cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); init_completion(&cpu_buffer->update_done); + init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), GFP_KERNEL, cpu_to_node(cpu)); @@ -1156,6 +1280,8 @@ struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, buffer->clock = trace_clock_local; buffer->reader_lock_key = key; + init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); + /* need at least two pages */ if (nr_pages < 2) nr_pages = 2; @@ -2610,6 +2736,22 @@ static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, rb_end_commit(cpu_buffer); } +static __always_inline void +rb_wakeups(struct ring_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) +{ + if (buffer->irq_work.waiters_pending) { + buffer->irq_work.waiters_pending = false; + /* irq_work_queue() supplies it's own memory barriers */ + irq_work_queue(&buffer->irq_work.work); + } + + if (cpu_buffer->irq_work.waiters_pending) { + cpu_buffer->irq_work.waiters_pending = false; + /* irq_work_queue() supplies it's own memory barriers */ + irq_work_queue(&cpu_buffer->irq_work.work); + } +} + /** * ring_buffer_unlock_commit - commit a reserved * @buffer: The buffer to commit to @@ -2629,6 +2771,8 @@ int ring_buffer_unlock_commit(struct ring_buffer *buffer, rb_commit(cpu_buffer, event); + rb_wakeups(buffer, cpu_buffer); + trace_recursive_unlock(); preempt_enable_notrace(); @@ -2801,6 +2945,8 @@ int ring_buffer_write(struct ring_buffer *buffer, rb_commit(cpu_buffer, event); + rb_wakeups(buffer, cpu_buffer); + ret = 0; out: preempt_enable_notrace(); -- cgit v1.1 From f1dc6725882b5ca54eb9a04436a3b47d58f2cbc7 Mon Sep 17 00:00:00 2001 From: "Steven Rostedt (Red Hat)" Date: Mon, 4 Mar 2013 17:33:05 -0500 Subject: ring-buffer: Init waitqueue for blocked readers The move of blocked readers to the ring buffer left out the init of the wait queue that is used. Tests missed this due to running stress tests against the buffers, which didn't allow for any readers to end up waiting. Running a simple read and wait triggered a bug. Signed-off-by: Steven Rostedt --- kernel/trace/ring_buffer.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'kernel/trace/ring_buffer.c') diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c index 56b6ea3..65fe2a4 100644 --- a/kernel/trace/ring_buffer.c +++ b/kernel/trace/ring_buffer.c @@ -1185,6 +1185,7 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int nr_pages, int cpu) INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); init_completion(&cpu_buffer->update_done); init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); + init_waitqueue_head(&cpu_buffer->irq_work.waiters); bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), GFP_KERNEL, cpu_to_node(cpu)); @@ -1281,6 +1282,7 @@ struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, buffer->reader_lock_key = key; init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); + init_waitqueue_head(&buffer->irq_work.waiters); /* need at least two pages */ if (nr_pages < 2) -- cgit v1.1 From f5eb5588262cab7232ed1d77cf612b327db50767 Mon Sep 17 00:00:00 2001 From: "Steven Rostedt (Red Hat)" Date: Thu, 7 Mar 2013 09:27:42 -0500 Subject: ring-buffer: Do not use schedule_work_on() for current CPU The ring buffer updates when done while the ring buffer is active, needs to be completed on the CPU that is used for the ring buffer per_cpu buffer. To accomplish this, schedule_work_on() is used to schedule work on the given CPU. Now there's no reason to use schedule_work_on() if the process doing the update happens to be on the CPU that it is processing. It has already filled the requirement. Instead, just do the work and continue. This is needed for tracing_snapshot_alloc() where it may be called really early in boot, where the work queues have not been set up yet. Signed-off-by: Steven Rostedt --- kernel/trace/ring_buffer.c | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) (limited to 'kernel/trace/ring_buffer.c') diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c index 65fe2a4..d1c85c5 100644 --- a/kernel/trace/ring_buffer.c +++ b/kernel/trace/ring_buffer.c @@ -1679,11 +1679,22 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size, if (!cpu_buffer->nr_pages_to_update) continue; - if (cpu_online(cpu)) + /* The update must run on the CPU that is being updated. */ + preempt_disable(); + if (cpu == smp_processor_id() || !cpu_online(cpu)) { + rb_update_pages(cpu_buffer); + cpu_buffer->nr_pages_to_update = 0; + } else { + /* + * Can not disable preemption for schedule_work_on() + * on PREEMPT_RT. + */ + preempt_enable(); schedule_work_on(cpu, &cpu_buffer->update_pages_work); - else - rb_update_pages(cpu_buffer); + preempt_disable(); + } + preempt_enable(); } /* wait for all the updates to complete */ @@ -1721,12 +1732,22 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size, get_online_cpus(); - if (cpu_online(cpu_id)) { + preempt_disable(); + /* The update must run on the CPU that is being updated. */ + if (cpu_id == smp_processor_id() || !cpu_online(cpu_id)) + rb_update_pages(cpu_buffer); + else { + /* + * Can not disable preemption for schedule_work_on() + * on PREEMPT_RT. + */ + preempt_enable(); schedule_work_on(cpu_id, &cpu_buffer->update_pages_work); wait_for_completion(&cpu_buffer->update_done); - } else - rb_update_pages(cpu_buffer); + preempt_disable(); + } + preempt_enable(); cpu_buffer->nr_pages_to_update = 0; put_online_cpus(); -- cgit v1.1 From 6c43e554a2a5c1f2caf1733d46719bc58de3e37b Mon Sep 17 00:00:00 2001 From: "Steven Rostedt (Red Hat)" Date: Fri, 15 Mar 2013 11:32:53 -0400 Subject: ring-buffer: Add ring buffer startup selftest When testing my large changes to the ftrace system, there was a bug that looked like the ring buffer was dropping events. I wrote up a quick integrity checker of the ring buffer to see if it was. Although the bug ended up being something stupid I did in ftrace, and had nothing to do with the ring buffer, I figured if I spent the time to write up this test, I might as well include it in the kernel. I cleaned it up a bit, as the original version was rather ugly. Not saying this version is pretty, but it's a beauty queen compared to what I original wrote. To enable the start up test, set CONFIG_RING_BUFFER_STARTUP_TEST. Note, it runs for 10 seconds, so it will slow your boot time by at least 10 more seconds. What it does is documented in both the comments and the Kconfig help. Signed-off-by: Steven Rostedt --- kernel/trace/ring_buffer.c | 319 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 319 insertions(+) (limited to 'kernel/trace/ring_buffer.c') diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c index d1c85c5..e5472f7 100644 --- a/kernel/trace/ring_buffer.c +++ b/kernel/trace/ring_buffer.c @@ -12,10 +12,12 @@ #include #include #include +#include /* for self test */ #include #include #include #include +#include #include #include #include @@ -4634,3 +4636,320 @@ static int rb_cpu_notify(struct notifier_block *self, return NOTIFY_OK; } #endif + +#ifdef CONFIG_RING_BUFFER_STARTUP_TEST +/* + * This is a basic integrity check of the ring buffer. + * Late in the boot cycle this test will run when configured in. + * It will kick off a thread per CPU that will go into a loop + * writing to the per cpu ring buffer various sizes of data. + * Some of the data will be large items, some small. + * + * Another thread is created that goes into a spin, sending out + * IPIs to the other CPUs to also write into the ring buffer. + * this is to test the nesting ability of the buffer. + * + * Basic stats are recorded and reported. If something in the + * ring buffer should happen that's not expected, a big warning + * is displayed and all ring buffers are disabled. + */ +static struct task_struct *rb_threads[NR_CPUS] __initdata; + +struct rb_test_data { + struct ring_buffer *buffer; + unsigned long events; + unsigned long bytes_written; + unsigned long bytes_alloc; + unsigned long bytes_dropped; + unsigned long events_nested; + unsigned long bytes_written_nested; + unsigned long bytes_alloc_nested; + unsigned long bytes_dropped_nested; + int min_size_nested; + int max_size_nested; + int max_size; + int min_size; + int cpu; + int cnt; +}; + +static struct rb_test_data rb_data[NR_CPUS] __initdata; + +/* 1 meg per cpu */ +#define RB_TEST_BUFFER_SIZE 1048576 + +static char rb_string[] __initdata = + "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" + "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" + "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; + +static bool rb_test_started __initdata; + +struct rb_item { + int size; + char str[]; +}; + +static __init int rb_write_something(struct rb_test_data *data, bool nested) +{ + struct ring_buffer_event *event; + struct rb_item *item; + bool started; + int event_len; + int size; + int len; + int cnt; + + /* Have nested writes different that what is written */ + cnt = data->cnt + (nested ? 27 : 0); + + /* Multiply cnt by ~e, to make some unique increment */ + size = (data->cnt * 68 / 25) % (sizeof(rb_string) - 1); + + len = size + sizeof(struct rb_item); + + started = rb_test_started; + /* read rb_test_started before checking buffer enabled */ + smp_rmb(); + + event = ring_buffer_lock_reserve(data->buffer, len); + if (!event) { + /* Ignore dropped events before test starts. */ + if (started) { + if (nested) + data->bytes_dropped += len; + else + data->bytes_dropped_nested += len; + } + return len; + } + + event_len = ring_buffer_event_length(event); + + if (RB_WARN_ON(data->buffer, event_len < len)) + goto out; + + item = ring_buffer_event_data(event); + item->size = size; + memcpy(item->str, rb_string, size); + + if (nested) { + data->bytes_alloc_nested += event_len; + data->bytes_written_nested += len; + data->events_nested++; + if (!data->min_size_nested || len < data->min_size_nested) + data->min_size_nested = len; + if (len > data->max_size_nested) + data->max_size_nested = len; + } else { + data->bytes_alloc += event_len; + data->bytes_written += len; + data->events++; + if (!data->min_size || len < data->min_size) + data->max_size = len; + if (len > data->max_size) + data->max_size = len; + } + + out: + ring_buffer_unlock_commit(data->buffer, event); + + return 0; +} + +static __init int rb_test(void *arg) +{ + struct rb_test_data *data = arg; + + while (!kthread_should_stop()) { + rb_write_something(data, false); + data->cnt++; + + set_current_state(TASK_INTERRUPTIBLE); + /* Now sleep between a min of 100-300us and a max of 1ms */ + usleep_range(((data->cnt % 3) + 1) * 100, 1000); + } + + return 0; +} + +static __init void rb_ipi(void *ignore) +{ + struct rb_test_data *data; + int cpu = smp_processor_id(); + + data = &rb_data[cpu]; + rb_write_something(data, true); +} + +static __init int rb_hammer_test(void *arg) +{ + while (!kthread_should_stop()) { + + /* Send an IPI to all cpus to write data! */ + smp_call_function(rb_ipi, NULL, 1); + /* No sleep, but for non preempt, let others run */ + schedule(); + } + + return 0; +} + +static __init int test_ringbuffer(void) +{ + struct task_struct *rb_hammer; + struct ring_buffer *buffer; + int cpu; + int ret = 0; + + pr_info("Running ring buffer tests...\n"); + + buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); + if (WARN_ON(!buffer)) + return 0; + + /* Disable buffer so that threads can't write to it yet */ + ring_buffer_record_off(buffer); + + for_each_online_cpu(cpu) { + rb_data[cpu].buffer = buffer; + rb_data[cpu].cpu = cpu; + rb_data[cpu].cnt = cpu; + rb_threads[cpu] = kthread_create(rb_test, &rb_data[cpu], + "rbtester/%d", cpu); + if (WARN_ON(!rb_threads[cpu])) { + pr_cont("FAILED\n"); + ret = -1; + goto out_free; + } + + kthread_bind(rb_threads[cpu], cpu); + wake_up_process(rb_threads[cpu]); + } + + /* Now create the rb hammer! */ + rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); + if (WARN_ON(!rb_hammer)) { + pr_cont("FAILED\n"); + ret = -1; + goto out_free; + } + + ring_buffer_record_on(buffer); + /* + * Show buffer is enabled before setting rb_test_started. + * Yes there's a small race window where events could be + * dropped and the thread wont catch it. But when a ring + * buffer gets enabled, there will always be some kind of + * delay before other CPUs see it. Thus, we don't care about + * those dropped events. We care about events dropped after + * the threads see that the buffer is active. + */ + smp_wmb(); + rb_test_started = true; + + set_current_state(TASK_INTERRUPTIBLE); + /* Just run for 10 seconds */; + schedule_timeout(10 * HZ); + + kthread_stop(rb_hammer); + + out_free: + for_each_online_cpu(cpu) { + if (!rb_threads[cpu]) + break; + kthread_stop(rb_threads[cpu]); + } + if (ret) { + ring_buffer_free(buffer); + return ret; + } + + /* Report! */ + pr_info("finished\n"); + for_each_online_cpu(cpu) { + struct ring_buffer_event *event; + struct rb_test_data *data = &rb_data[cpu]; + struct rb_item *item; + unsigned long total_events; + unsigned long total_dropped; + unsigned long total_written; + unsigned long total_alloc; + unsigned long total_read = 0; + unsigned long total_size = 0; + unsigned long total_len = 0; + unsigned long total_lost = 0; + unsigned long lost; + int big_event_size; + int small_event_size; + + ret = -1; + + total_events = data->events + data->events_nested; + total_written = data->bytes_written + data->bytes_written_nested; + total_alloc = data->bytes_alloc + data->bytes_alloc_nested; + total_dropped = data->bytes_dropped + data->bytes_dropped_nested; + + big_event_size = data->max_size + data->max_size_nested; + small_event_size = data->min_size + data->min_size_nested; + + pr_info("CPU %d:\n", cpu); + pr_info(" events: %ld\n", total_events); + pr_info(" dropped bytes: %ld\n", total_dropped); + pr_info(" alloced bytes: %ld\n", total_alloc); + pr_info(" written bytes: %ld\n", total_written); + pr_info(" biggest event: %d\n", big_event_size); + pr_info(" smallest event: %d\n", small_event_size); + + if (RB_WARN_ON(buffer, total_dropped)) + break; + + ret = 0; + + while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { + total_lost += lost; + item = ring_buffer_event_data(event); + total_len += ring_buffer_event_length(event); + total_size += item->size + sizeof(struct rb_item); + if (memcmp(&item->str[0], rb_string, item->size) != 0) { + pr_info("FAILED!\n"); + pr_info("buffer had: %.*s\n", item->size, item->str); + pr_info("expected: %.*s\n", item->size, rb_string); + RB_WARN_ON(buffer, 1); + ret = -1; + break; + } + total_read++; + } + if (ret) + break; + + ret = -1; + + pr_info(" read events: %ld\n", total_read); + pr_info(" lost events: %ld\n", total_lost); + pr_info(" total events: %ld\n", total_lost + total_read); + pr_info(" recorded len bytes: %ld\n", total_len); + pr_info(" recorded size bytes: %ld\n", total_size); + if (total_lost) + pr_info(" With dropped events, record len and size may not match\n" + " alloced and written from above\n"); + if (!total_lost) { + if (RB_WARN_ON(buffer, total_len != total_alloc || + total_size != total_written)) + break; + } + if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) + break; + + ret = 0; + } + if (!ret) + pr_info("Ring buffer PASSED!\n"); + + ring_buffer_free(buffer); + return 0; +} + +late_initcall(test_ringbuffer); +#endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ -- cgit v1.1