Skip to content

Commit

Permalink
Support for 3.3 and N:M threads (#18)
Browse files Browse the repository at this point in the history
Fix #17 
Paired with @byroot 

This PR adds support for Ruby 3.3 by using the new
`rb_internal_thread_specific_get` which allows accessing data specific
to a Ruby thread without requiring the GVL to be locked. It also handles
N:M threads automatically.

More info here:
ruby/ruby@352a885

In Ruby 3.3, the thread events don't necessarily run from the thread
they correspond to, so the thread corresponding to the event is sent in
`event_data`, and now `GT_LOCAL_STATE` needs to take the thread too.

To handle GC, the thread specific data is wrapped into a TypedData
struct and stored in the Ruby thread as a instance variable. This
requires getting the GVL, so we have an `allocate` flag on
`GT_LOCAL_STATE` too, which is false when we're not running from the
thread and can't allocate. The hook only runs from its corresponding
thread for the `RUBY_INTERNAL_THREAD_EVENT_STARTED` and
`RUBY_INTERNAL_THREAD_EVENT_RESUMED` events, so we can allocate there.

If the local state hasn't been initialized yet, we just return early.
This means we would skipping events from already existing threads that
haven't been resumed yet. To avoid this, we initialize all threads when
tracing starts. We do this from Ruby, which ensures we have the GVL. We
added a spec for that, which passes on main on 3.2, but not on 3.3.

To support N:M threads (and differentiate events from different threads
but the same native thread), instead of returning the native thread id,
we know return the address of the Ruby thread in `tid` (on Ruby 3.3).
  • Loading branch information
ivoanjo committed Feb 21, 2024
2 parents 3c98c99 + 7c83ec6 commit 4ffa17f
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 39 deletions.
10 changes: 9 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,22 @@ jobs:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
ruby: ['3.2', '3.3', head, debug]
mn_threads: ["0", "1"]
exclude:
- os: windows-latest
ruby: debug
- ruby: "3.2"
mn_threads: "1"
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
- uses: ruby/setup-ruby@v1
with:
ruby-version: ${{ matrix.ruby }}
bundler-cache: true # runs 'bundle install' and caches installed gems automatically
- run: bundle exec rake
- name: bundle exec rake
run: |
ruby -v
bundle exec rake
env:
RUBY_MN_THREADS: ${{ matrix.mn_threads }}
2 changes: 2 additions & 0 deletions ext/gvl_tracing_native_extension/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
have_header("pthread.h")
have_func("pthread_getname_np", "pthread.h")
have_func("pthread_threadid_np", "pthread.h")
have_func("rb_internal_thread_specific_get", "ruby/thread.h") # 3.3+

append_cflags("-Werror-implicit-function-declaration")
append_cflags("-Wunused-parameter")
append_cflags("-Wold-style-definition")
Expand Down
169 changes: 133 additions & 36 deletions ext/gvl_tracing_native_extension/gvl_tracing.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,64 @@
#define UNUSED_ARG
#endif

static VALUE tracing_start(VALUE _self, VALUE output_path);
static VALUE tracing_stop(VALUE _self);
static double timestamp_microseconds(void);
static void set_native_thread_id(void);
static void render_event(const char *event_name);
static void on_thread_event(rb_event_flag_t event, const rb_internal_thread_event_data_t *_unused1, void *_unused2);
static void on_gc_event(VALUE tpval, void *_unused1);
static VALUE mark_sleeping(VALUE _self);
typedef struct {
bool current_thread_seen;
unsigned int current_thread_serial;
uint64_t thread_id;
VALUE thread;
rb_event_flag_t previous_state; // Used to coalesce similar events
bool sleeping; // Used to track when a thread is sleeping
} thread_local_state;

#ifdef HAVE_RB_INTERNAL_THREAD_SPECIFIC_GET // 3.3+

static int thread_storage_key = 0;

static size_t thread_local_state_memsize(UNUSED_ARG const void *data) {
return sizeof(thread_local_state);
}

static void thread_local_state_mark(void *data) {
thread_local_state *state = (thread_local_state *)data;
rb_gc_mark(state->thread); // Marking thread to make sure it stays pinned
}

static const rb_data_type_t thread_local_state_type = {
.wrap_struct_name = "GvlTracing::__threadLocal",
.function = {
.dmark = thread_local_state_mark,
.dfree = RUBY_DEFAULT_FREE,
.dsize = thread_local_state_memsize,
},
.flags = RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED,
};

static inline thread_local_state *GT_LOCAL_STATE(VALUE thread, bool allocate) {
thread_local_state *state = rb_internal_thread_specific_get(thread, thread_storage_key);
if (!state && allocate) {
VALUE wrapper = TypedData_Make_Struct(rb_cObject, thread_local_state, &thread_local_state_type, state);
state->thread = thread;
rb_thread_local_aset(thread, rb_intern("__gvl_tracing_local_state"), wrapper);
rb_internal_thread_specific_set(thread, thread_storage_key, state);
RB_GC_GUARD(wrapper);
}
return state;
}

#define GT_EVENT_LOCAL_STATE(event_data, allocate) GT_LOCAL_STATE(event_data->thread, allocate)
// Must only be called from a thread holding the GVL
#define GT_CURRENT_THREAD_LOCAL_STATE() GT_LOCAL_STATE(rb_thread_current(), true)

#else // < 3.3

// Thread-local state
static _Thread_local bool current_thread_seen = false;
static _Thread_local unsigned int current_thread_serial = 0;
static _Thread_local uint64_t thread_id = 0;
static _Thread_local rb_event_flag_t previous_state = 0; // Used to coalesce similar events
static _Thread_local bool sleeping = false; // Used to track when a thread is sleeping
static _Thread_local thread_local_state __thread_local_state = { 0 };

#define GT_LOCAL_STATE(thread, allocate) (&__thread_local_state)
#define GT_EVENT_LOCAL_STATE(event_data, allocate) (&__thread_local_state)
#define GT_CURRENT_THREAD_LOCAL_STATE() (&__thread_local_state)

#endif

// Global mutable state
static rb_atomic_t thread_serial = 0;
Expand All @@ -73,46 +116,81 @@ static double started_tracing_at_microseconds = 0;
static int64_t process_id = 0;
static VALUE gc_tracepoint = Qnil;

static VALUE tracing_init_local_storage(VALUE, VALUE);
static VALUE tracing_start(VALUE _self, VALUE output_path);
static VALUE tracing_stop(VALUE _self);
static double timestamp_microseconds(void);
static void set_native_thread_id(thread_local_state *);
static void render_event(thread_local_state *, const char *event_name);
static void on_thread_event(rb_event_flag_t event, const rb_internal_thread_event_data_t *_unused1, void *_unused2);
static void on_gc_event(VALUE tpval, void *_unused1);
static VALUE mark_sleeping(VALUE _self);

void Init_gvl_tracing_native_extension(void) {
#ifdef HAVE_RB_INTERNAL_THREAD_SPECIFIC_GET // 3.3+
thread_storage_key = rb_internal_thread_specific_key_create();
#endif

rb_global_variable(&gc_tracepoint);

VALUE gvl_tracing_module = rb_define_module("GvlTracing");

rb_define_singleton_method(gvl_tracing_module, "_init_local_storage", tracing_init_local_storage, 1);
rb_define_singleton_method(gvl_tracing_module, "_start", tracing_start, 1);
rb_define_singleton_method(gvl_tracing_module, "_stop", tracing_stop, 0);
rb_define_singleton_method(gvl_tracing_module, "mark_sleeping", mark_sleeping, 0);
}

static inline void initialize_thread_id(void) {
current_thread_seen = true;
current_thread_serial = RUBY_ATOMIC_FETCH_ADD(thread_serial, 1);
set_native_thread_id();
static inline void initialize_thread_id(thread_local_state *state) {
state->current_thread_seen = true;
state->current_thread_serial = RUBY_ATOMIC_FETCH_ADD(thread_serial, 1);
set_native_thread_id(state);
}

static inline void render_thread_metadata(void) {
static inline void render_thread_metadata(thread_local_state *state) {
char native_thread_name_buffer[64] = "(unnamed)";

#ifdef HAVE_PTHREAD_GETNAME_NP
pthread_getname_np(pthread_self(), native_thread_name_buffer, sizeof(native_thread_name_buffer));
#endif

#ifdef HAVE_RB_INTERNAL_THREAD_SPECIFIC_GET
uint64_t thread_id = (uint64_t)state->thread;
// For JSON, values above only 53 bits are interoperable
#if SIZEOF_VALUE > 4
thread_id = ((uint32_t)(thread_id >> 32) ^ (uint32_t)(thread_id & 0xFFFFFFFF));
#endif
#else
uint64_t thread_id = state->thread_id;
#endif
fprintf(output_file,
" {\"ph\": \"M\", \"pid\": %"PRId64", \"tid\": %"PRIu64", \"name\": \"thread_name\", \"args\": {\"name\": \"%s\"}},\n",
process_id, thread_id, native_thread_name_buffer);
}

static VALUE tracing_init_local_storage(UNUSED_ARG VALUE _self, VALUE threads) {
#ifdef HAVE_RB_INTERNAL_THREAD_SPECIFIC_GET // 3.3+
for (long i = 0, len = RARRAY_LEN(threads); i < len; i++) {
VALUE thread = RARRAY_AREF(threads, i);
GT_LOCAL_STATE(thread, true);
}
#endif
return Qtrue;
}

static VALUE tracing_start(UNUSED_ARG VALUE _self, VALUE output_path) {
Check_Type(output_path, T_STRING);

if (output_file != NULL) rb_raise(rb_eRuntimeError, "Already started");
output_file = fopen(StringValuePtr(output_path), "w");
if (output_file == NULL) rb_syserr_fail(errno, "Failed to open GvlTracing output file");

thread_local_state *state = GT_CURRENT_THREAD_LOCAL_STATE();
started_tracing_at_microseconds = timestamp_microseconds();
process_id = getpid();

fprintf(output_file, "[\n");
render_event("started_tracing");
render_event(state, "started_tracing");

current_hook = rb_internal_thread_add_event_hook(
on_thread_event,
Expand Down Expand Up @@ -143,11 +221,12 @@ static VALUE tracing_start(UNUSED_ARG VALUE _self, VALUE output_path) {
static VALUE tracing_stop(UNUSED_ARG VALUE _self) {
if (output_file == NULL) rb_raise(rb_eRuntimeError, "Tracing not running");

thread_local_state *state = GT_CURRENT_THREAD_LOCAL_STATE();
rb_internal_thread_remove_event_hook(current_hook);
rb_tracepoint_disable(gc_tracepoint);
gc_tracepoint = Qnil;

render_event("stopped_tracing");
render_event(state, "stopped_tracing");
// closing the json syntax in the output file is handled in GvlTracing.stop code

if (fclose(output_file) != 0) rb_syserr_fail(errno, "Failed to close GvlTracing output file");
Expand All @@ -163,29 +242,29 @@ static double timestamp_microseconds(void) {
return (current_monotonic.tv_nsec / 1000.0) + (current_monotonic.tv_sec * 1000.0 * 1000.0);
}

static void set_native_thread_id(void) {
static void set_native_thread_id(thread_local_state *state) {
uint64_t native_thread_id = 0;

#ifdef HAVE_PTHREAD_THREADID_NP
pthread_threadid_np(pthread_self(), &native_thread_id);
#elif HAVE_GETTID
native_thread_id = gettid();
#else
native_thread_id = current_thread_serial; // TODO: Better fallback for Windows?
native_thread_id = state->current_thread_serial; // TODO: Better fallback for Windows?
#endif

thread_id = native_thread_id;
state->thread_id = native_thread_id;
}

// Render output using trace event format for perfetto:
// https://chromium.googlesource.com/catapult/+/refs/heads/main/docs/trace-event-format.md
static void render_event(const char *event_name) {
static void render_event(thread_local_state *state, const char *event_name) {
// Event data
double now_microseconds = timestamp_microseconds() - started_tracing_at_microseconds;

if (!current_thread_seen) {
initialize_thread_id();
render_thread_metadata();
if (!state->current_thread_seen) {
initialize_thread_id(state);
render_thread_metadata(state);
}

// Each event is converted into two events in the output: one that signals the end of the previous event
Expand All @@ -195,6 +274,16 @@ static void render_event(const char *event_name) {
// Important note: We've observed some rendering issues in perfetto if the tid or pid are numbers that are "too big",
// see https://github.com/ivoanjo/gvl-tracing/pull/4#issuecomment-1196463364 for an example.

#ifdef HAVE_RB_INTERNAL_THREAD_SPECIFIC_GET
uint64_t thread_id = (uint64_t)state->thread;
// Thread IDs are 32-bit
#if SIZEOF_VALUE > 4
thread_id = ((uint32_t)(thread_id >> 32) ^ (uint32_t)(thread_id & 0xFFFFFFFF));
#endif
#else
uint64_t thread_id = state->thread_id;
#endif

fprintf(output_file,
// Finish previous duration
" {\"ph\": \"E\", \"pid\": %"PRId64", \"tid\": %"PRIu64", \"ts\": %f},\n" \
Expand All @@ -207,22 +296,29 @@ static void render_event(const char *event_name) {
);
}

static void on_thread_event(rb_event_flag_t event_id, UNUSED_ARG const rb_internal_thread_event_data_t *_unused1, UNUSED_ARG void *_unused2) {
static void on_thread_event(rb_event_flag_t event_id, const rb_internal_thread_event_data_t *event_data, UNUSED_ARG void *_unused2) {
thread_local_state *state = GT_EVENT_LOCAL_STATE(event_data,
// These events are guaranteed to hold the GVL, so they can allocate
event_id & (RUBY_INTERNAL_THREAD_EVENT_STARTED | RUBY_INTERNAL_THREAD_EVENT_RESUMED));
if (!state) return;
#ifdef HAVE_RB_INTERNAL_THREAD_SPECIFIC_GET
if (!state->thread) state->thread = event_data->thread;
#endif
// In some cases, Ruby seems to emit multiple suspended events for the same thread in a row (e.g. when multiple threads)
// are waiting on a Thread::ConditionVariable.new that gets signaled. We coalesce these events to make the resulting
// timeline easier to see.
//
// I haven't observed other situations where we'd want to coalesce events, but we may apply this to all events in the
// future. One annoying thing to remember when generalizing this is how to reset the `previous_state` across multiple
// start/stop calls to GvlTracing.
if (event_id == RUBY_INTERNAL_THREAD_EVENT_SUSPENDED && event_id == previous_state) return;
previous_state = event_id;
if (event_id == RUBY_INTERNAL_THREAD_EVENT_SUSPENDED && event_id == state->previous_state) return;
state->previous_state = event_id;

if (event_id == RUBY_INTERNAL_THREAD_EVENT_SUSPENDED && sleeping) {
render_event("sleeping");
if (event_id == RUBY_INTERNAL_THREAD_EVENT_SUSPENDED && state->sleeping) {
render_event(state, "sleeping");
return;
} else {
sleeping = false;
state->sleeping = false;
}

const char* event_name = "bug_unknown_event";
Expand All @@ -233,20 +329,21 @@ static void on_thread_event(rb_event_flag_t event_id, UNUSED_ARG const rb_intern
case RUBY_INTERNAL_THREAD_EVENT_STARTED: event_name = "started"; break;
case RUBY_INTERNAL_THREAD_EVENT_EXITED: event_name = "died"; break;
};
render_event(event_name);
render_event(state, event_name);
}

static void on_gc_event(VALUE tpval, UNUSED_ARG void *_unused1) {
const char* event_name = "bug_unknown_event";
thread_local_state *state = GT_LOCAL_STATE(rb_thread_current(), false); // no alloc during GC
switch (rb_tracearg_event_flag(rb_tracearg_from_tracepoint(tpval))) {
case RUBY_INTERNAL_EVENT_GC_ENTER: event_name = "gc"; break;
// TODO: is it possible the thread wasn't running? Might need to save the last state.
case RUBY_INTERNAL_EVENT_GC_EXIT: event_name = "running"; break;
}
render_event(event_name);
render_event(state, event_name);
}

static VALUE mark_sleeping(UNUSED_ARG VALUE _self) {
sleeping = true;
GT_CURRENT_THREAD_LOCAL_STATE()->sleeping = true;
return Qnil;
}
1 change: 1 addition & 0 deletions lib/gvl-tracing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class << self

def start(file)
_start(file)
_init_local_storage(Thread.list)
@path = file

return unless block_given?
Expand Down
24 changes: 22 additions & 2 deletions spec/gvl_tracing_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@

describe "order of events" do
it "first and last events are in a consistent order" do
pending "Ruby 3.3 support is WIP" unless RUBY_VERSION.start_with?("3.2.")

GvlTracing.start(trace_path) do
[Thread.new {}, Thread.new {}].each(&:join)
end
Expand All @@ -71,4 +69,26 @@
expect(second.last.name).to eq("died")
end
end

describe "thread already started" do
it "has events that would require the GVL" do
started = Queue.new
finish = Queue.new
thread = Thread.new do
started << true
finish.pop
end
started.pop
GvlTracing.start(trace_path) do
finish << true
thread.join
end

trace = PerfettoTrace.new(trace_path)
traces = trace.events_by_thread
# skip the main thread
first = traces[traces.keys[1]].filter_map(&:name)
expect(first).to include("wants_gvl")
end
end
end

0 comments on commit 4ffa17f

Please sign in to comment.