Skip to content

Commit

Permalink
Make GGML threads spawn 10x faster
Browse files Browse the repository at this point in the history
This change causes GGML threads to be recycled. That way there's less of
a gap between predictions in the tracing diagram. It has the most impact
on smaller models like TinyLLaMA where I'm seeing a ~15% boost in tokens
per second when generating text. The new llamafiler embeddings server is
going ~25% faster, handling ~1000 requests per second on my workstation.
This change also boosts my TinyLLaMA prefill speed ~20%, by letting CONT
be multi-threaded which Slaren apparently discovered upstream last month
  • Loading branch information
jart committed Aug 2, 2024
1 parent 2699f6b commit cda83f8
Show file tree
Hide file tree
Showing 13 changed files with 406 additions and 43 deletions.
34 changes: 16 additions & 18 deletions llama.cpp/ggml.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ SOFTWARE.");
#include "llamafile/thread.h"
#include "llamafile/crash.h"
#include "llamafile/trace.h"
#include "llamafile/pool.h"

#include <alloca.h>
#include <assert.h>
Expand Down Expand Up @@ -1651,7 +1652,7 @@ struct ggml_compute_state_shared {
void* abort_callback_data;
};

typedef pthread_t ggml_thread_t;
typedef llamafile_task_t ggml_thread_t;

struct ggml_compute_state {
_Atomic(ggml_thread_t) thrd;
Expand Down Expand Up @@ -13302,6 +13303,7 @@ GGML_CALL void ggml_rope_yarn_corr_dims(
dims[1] = MIN(n_dims - 1, end);
}

__target_clones("avx2") // [jart]
static void ggml_compute_forward_rope_f32(
const struct ggml_compute_params * params,
struct ggml_tensor * dst,
Expand Down Expand Up @@ -18355,10 +18357,11 @@ typedef int ggml_lock_t;

#define GGML_LOCK_INITIALIZER 0

typedef pthread_t ggml_thread_t;
typedef llamafile_task_t ggml_thread_t;

#define ggml_thread_create llamafile_thread_create // [jart]
#define ggml_thread_join pthread_join
#define ggml_thread_create llamafile_task_create // [jart]
#define ggml_thread_cancel llamafile_task_cancel
#define ggml_thread_join llamafile_task_join

#else

Expand All @@ -18382,10 +18385,11 @@ typedef int ggml_lock_t;

#define GGML_LOCK_INITIALIZER 0

typedef pthread_t ggml_thread_t;
typedef llamafile_task_t ggml_thread_t;

#define ggml_thread_create pthread_create
#define ggml_thread_join pthread_join
#define ggml_thread_create llamafile_task_create // [jart]
#define ggml_thread_cancel llamafile_task_cancel
#define ggml_thread_join llamafile_task_join

#endif

Expand Down Expand Up @@ -18484,6 +18488,7 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads, int n_cur_
switch (node->op) {
case GGML_OP_CPY:
case GGML_OP_DUP:
case GGML_OP_CONT: // [jart] don't move me
case GGML_OP_ADD:
case GGML_OP_ADD1:
case GGML_OP_ACC:
Expand Down Expand Up @@ -18568,7 +18573,6 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads, int n_cur_
} break;
case GGML_OP_SCALE:
case GGML_OP_SET:
case GGML_OP_CONT:
case GGML_OP_RESHAPE:
case GGML_OP_VIEW:
case GGML_OP_PERMUTE:
Expand Down Expand Up @@ -19185,10 +19189,10 @@ static void ggml_compute_canceled(void *arg) {
struct ggml_compute_cleanup *cleanup = arg;
clear_numa_thread_affinity();
for (int j = 1; j < cleanup->n_threads; j++) {
pthread_t t;
ggml_thread_t t;
if ((t = atomic_exchange_explicit(&cleanup->workers[j].thrd, 0,
memory_order_relaxed))) {
pthread_cancel(t);
ggml_thread_cancel(t);
const int rc = ggml_thread_join(t, NULL);
GGML_ASSERT(rc == 0);
}
Expand Down Expand Up @@ -19241,14 +19245,8 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
.is_main_thread = false, // [jart]
};

pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, 128 * 1024);
pthread_attr_setguardsize(&attr, sysconf(_SC_PAGESIZE));
pthread_attr_setsigaltstacksize_np(&attr, sysconf(_SC_MINSIGSTKSZ) + 16384);
const int rc = ggml_thread_create((pthread_t *)&workers[j].thrd, &attr,
const int rc = ggml_thread_create((ggml_thread_t *)&workers[j].thrd,
ggml_graph_compute_thread, &workers[j]);
pthread_attr_destroy(&attr);
GGML_ASSERT(rc == 0);
UNUSED(rc);
}
Expand Down Expand Up @@ -19276,7 +19274,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
int cs;
pthread_setcancelstate(PTHREAD_CANCEL_MASKED, &cs);
for (int j = 1; j < n_threads; j++) {
pthread_t t;
ggml_thread_t t;
if ((t = atomic_exchange_explicit(&workers[j].thrd, 0,
memory_order_relaxed))) {
const int rc = ggml_thread_join(t, NULL);
Expand Down
28 changes: 20 additions & 8 deletions llamafile/BUILD.mk
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,16 @@ o/$(MODE)/llamafile/tokenize: \
o/$(MODE)/llama.cpp/llama.cpp.a

.PHONY: o/$(MODE)/llamafile
o/$(MODE)/llamafile: \
$(LLAMAFILE_OBJS) \
o/$(MODE)/llamafile/server \
o/$(MODE)/llamafile/simple \
o/$(MODE)/llamafile/zipalign \
o/$(MODE)/llamafile/zipcheck \
o/$(MODE)/llamafile/tokenize \
o/$(MODE)/llamafile/addnl \
o/$(MODE)/llamafile: \
$(LLAMAFILE_OBJS) \
o/$(MODE)/llamafile/server \
o/$(MODE)/llamafile/simple \
o/$(MODE)/llamafile/zipalign \
o/$(MODE)/llamafile/zipcheck \
o/$(MODE)/llamafile/tokenize \
o/$(MODE)/llamafile/addnl \
o/$(MODE)/llamafile/pool_test.runs \
o/$(MODE)/llamafile/pool_cancel_test.runs \

################################################################################
# microarchitectures
Expand Down Expand Up @@ -141,6 +143,16 @@ o/$(MODE)/llamafile/tinyblas_cpu_sgemm_arm82.o: \
################################################################################
# testing

o/$(MODE)/llamafile/pool_test: \
o/$(MODE)/llamafile/pool_test.o \
o/$(MODE)/llamafile/crash.o \
o/$(MODE)/llamafile/pool.o \

o/$(MODE)/llamafile/pool_cancel_test: \
o/$(MODE)/llamafile/pool_cancel_test.o \
o/$(MODE)/llamafile/crash.o \
o/$(MODE)/llamafile/pool.o \

o/$(MODE)/llamafile/thread_test: \
o/$(MODE)/llamafile/thread_test.o \
o/$(MODE)/llamafile/crash.o \
Expand Down
6 changes: 3 additions & 3 deletions llamafile/core_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ static void unlock_mutex(void *arg) {
}

int CoreManager::acquire(int need, int greed) {
unassert(need >= 1);
unassert(greed >= need);
npassert(need >= 1);
npassert(greed >= need);

int got = 0;

Expand Down Expand Up @@ -80,5 +80,5 @@ void CoreManager::release(int count) {
}
pthread_cond_signal(&cv_);
pthread_mutex_unlock(&mu_);
unassert(ok);
npassert(ok);
}
2 changes: 1 addition & 1 deletion llamafile/llamafile.c
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ size_t llamafile_tell(struct llamafile *file) {
if (!file->fp)
return file->position;
long ret = ftell(file->fp);
unassert(ret != -1); // shouldn't fail because we seeked earlier
npassert(ret != -1); // shouldn't fail because we seeked earlier
return (size_t)ret;
}

Expand Down
212 changes: 212 additions & 0 deletions llamafile/pool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
// -*- mode:c++;indent-tabs-mode:nil;c-basic-offset:4;coding:utf-8 -*-
// vi: set et ft=cpp ts=4 sts=4 sw=4 fenc=utf-8 :vi
//
// Copyright 2024 Mozilla Foundation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "pool.h"

#include <assert.h>
#include <cosmo.h>
#include <pthread.h>
#include <sched.h>
#include <stdatomic.h>
#include <unistd.h>

#include "threadlocal.h"

struct llamafile_thread;
static void llamafile_thread_canceled(llamafile_thread *);
static ThreadLocal<llamafile_thread> g_key(llamafile_thread_canceled);

struct llamafile_task {
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mu = PTHREAD_MUTEX_INITIALIZER;
void *(*func)(void *);
void *arg;
void *res;
pthread_t th = -1;
};

struct llamafile_thread {
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mu = PTHREAD_MUTEX_INITIALIZER;
llamafile_task *task;
llamafile_thread *next;
pthread_t th;
};

static atomic_int g_active;
static _Atomic(llamafile_thread *) g_idle;

static void unlock_mutex(void *arg) {
pthread_mutex_t *mu = (pthread_mutex_t *)arg;
pthread_mutex_unlock(mu);
}

static void idle_push(llamafile_thread *thread) {
int backoff = 0;
thread->next = atomic_load_explicit(&g_idle, memory_order_relaxed);
while (!atomic_compare_exchange_weak_explicit(&g_idle, &thread->next, thread,
memory_order_acq_rel, memory_order_relaxed))
backoff = pthread_delay_np(&g_idle, backoff);
}

static llamafile_thread *idle_pop(void) {
int backoff = 0;
llamafile_thread *thread;
for (;;) {
if ((thread = atomic_load_explicit(&g_idle, memory_order_relaxed))) {
if (atomic_compare_exchange_weak_explicit(&g_idle, &thread, thread->next,
memory_order_acq_rel, memory_order_relaxed))
return thread;
backoff = pthread_delay_np(g_idle, backoff);
} else {
return nullptr;
}
}
}

static void cancel_task(llamafile_task *task) {
pthread_mutex_lock(&task->mu);
task->res = PTHREAD_CANCELED;
task->th = 0;
pthread_cond_signal(&task->cv);
pthread_mutex_unlock(&task->mu);
}

static void llamafile_thread_canceled(llamafile_thread *thread) {
thread->th = 0;
cancel_task(thread->task);
delete thread;
--g_active;
}

static void *llamafile_thread_worker(void *arg) {
errno_t err;
llamafile_thread *thread = (llamafile_thread *)arg;

++g_active;
g_key.set(thread);
do {
void *res = thread->task->func(thread->task->arg);
pthread_setcancelstate(PTHREAD_CANCEL_MASKED, 0);

pthread_mutex_lock(&thread->task->mu);
thread->task->res = res;
thread->task->th = 0;
pthread_cond_signal(&thread->task->cv);
pthread_mutex_unlock(&thread->task->mu);

pthread_cleanup_push(unlock_mutex, &thread->mu);
pthread_mutex_lock(&thread->mu);
thread->task = nullptr;
idle_push(thread);
while (!thread->task) {
err = pthread_cond_wait(&thread->cv, &thread->mu);
if (err == ECANCELED)
break;
}
pthread_cleanup_pop(true);
pthread_setcancelstate(PTHREAD_CANCEL_DEFERRED, 0);
} while (err != ECANCELED);

if (thread->task)
cancel_task(thread->task);

thread->th = 0;
g_key.set(nullptr);
delete thread;
--g_active;

return 0;
}

static errno_t llamafile_thread_create(llamafile_task *task) {
llamafile_thread *thread = new llamafile_thread;
thread->task = task;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, 128 * 1024);
pthread_attr_setguardsize(&attr, sysconf(_SC_PAGESIZE));
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_attr_setsigaltstacksize_np(&attr, sysconf(_SC_MINSIGSTKSZ) + 16384);
errno_t err = pthread_create(&thread->th, &attr, llamafile_thread_worker, thread);
pthread_attr_destroy(&attr);
if (!err) {
task->th = thread->th;
} else {
delete thread;
}
return err;
}

errno_t llamafile_task_create(llamafile_task **out_task, void *(*func)(void *), void *arg) {
llamafile_task *task = new llamafile_task;
task->func = func;
task->arg = arg;
errno_t err;
llamafile_thread *thread;
if ((thread = idle_pop())) {
pthread_mutex_lock(&thread->mu);
thread->task = task;
task->th = thread->th;
pthread_cond_signal(&thread->cv);
pthread_mutex_unlock(&thread->mu);
err = 0;
} else {
err = llamafile_thread_create(task);
}
if (!err) {
*out_task = task;
} else {
delete task;
}
return err;
}

errno_t llamafile_task_join(llamafile_task *task, void **out_res) {
pthread_cleanup_push(unlock_mutex, &task->mu);
pthread_mutex_lock(&task->mu);
while (task->th)
pthread_cond_wait(&task->cv, &task->mu);
pthread_cleanup_pop(true);
if (out_res)
*out_res = task->res;
delete task;
return 0;
}

errno_t llamafile_task_cancel(llamafile_task *task) {
errno_t err = 0;
if (task->th)
err = pthread_cancel(task->th);
return err;
}

void llamafile_task_shutdown(void) {
llamafile_thread *thread;
while ((thread = idle_pop()))
if (thread->th)
pthread_cancel(thread->th);
int backoff = 0;
while (g_active)
backoff = pthread_delay_np(&g_idle, backoff);
}

static struct llamafile_tasks {
~llamafile_tasks(void) {
llamafile_task_shutdown();
}
} g_tasks;
Loading

0 comments on commit cda83f8

Please sign in to comment.