Skip to content

Commit

Permalink
Separate main and back-channel audio threads
Browse files Browse the repository at this point in the history
In order to simplify dealing with delays, which is important for
PCM draining, we are going to separate audio threads for main and
back-channel streams. This will allow to use very simple IO loop
and offload multiplexing to the OS.
  • Loading branch information
arkq committed Jan 9, 2021
1 parent 2e60c5b commit 74b4d2f
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 32 deletions.
55 changes: 32 additions & 23 deletions src/ba-transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ static const char *transport_get_dbus_path_type(

static int transport_pcm_init(
struct ba_transport_pcm *pcm,
struct ba_transport *t,
struct ba_transport_thread *th,
enum ba_transport_pcm_mode mode) {

struct ba_transport *t = th->t;

pcm->t = t;
pcm->th = th;
pcm->mode = mode;
pcm->fd = -1;

Expand Down Expand Up @@ -105,6 +108,8 @@ static int transport_thread_init(
th->pipe[0] = -1;
th->pipe[1] = -1;

pthread_mutex_init(&th->mutex, NULL);

if (pipe(th->pipe) == -1)
return -1;

Expand Down Expand Up @@ -140,6 +145,7 @@ static void transport_thread_free(
close(th->pipe[0]);
if (th->pipe[1] != -1)
close(th->pipe[1]);
pthread_mutex_destroy(&th->mutex);
}

/**
Expand Down Expand Up @@ -170,7 +176,10 @@ static struct ba_transport *transport_new(

t->bt_fd = -1;

if (transport_thread_init(&t->thread, t) != 0)
err = 0;
err |= transport_thread_init(&t->thread, t);
err |= transport_thread_init(&t->thread_bc, t);
if (err != 0)
goto fail;

if ((t->bluez_dbus_owner = strdup(dbus_owner)) == NULL)
Expand Down Expand Up @@ -220,12 +229,12 @@ struct ba_transport *ba_transport_new_a2dp(
t->a2dp.configuration = g_memdup(configuration, codec->capabilities_size);
t->a2dp.state = BLUEZ_A2DP_TRANSPORT_STATE_IDLE;

transport_pcm_init(&t->a2dp.pcm, t, is_sink ?
transport_pcm_init(&t->a2dp.pcm, &t->thread, is_sink ?
BA_TRANSPORT_PCM_MODE_SOURCE : BA_TRANSPORT_PCM_MODE_SINK);
t->a2dp.pcm.soft_volume = !config.a2dp.volume;
t->a2dp.pcm.max_bt_volume = 127;

transport_pcm_init(&t->a2dp.pcm_bc, t, is_sink ?
transport_pcm_init(&t->a2dp.pcm_bc, &t->thread_bc, is_sink ?
BA_TRANSPORT_PCM_MODE_SINK : BA_TRANSPORT_PCM_MODE_SOURCE);
t->a2dp.pcm_bc.soft_volume = !config.a2dp.volume;
t->a2dp.pcm_bc.max_bt_volume = 127;
Expand Down Expand Up @@ -271,10 +280,10 @@ struct ba_transport *ba_transport_new_sco(

t->type = type;

transport_pcm_init(&t->sco.spk_pcm, t, BA_TRANSPORT_PCM_MODE_SINK);
transport_pcm_init(&t->sco.spk_pcm, &t->thread, BA_TRANSPORT_PCM_MODE_SINK);
t->sco.spk_pcm.max_bt_volume = 15;

transport_pcm_init(&t->sco.mic_pcm, t, BA_TRANSPORT_PCM_MODE_SOURCE);
transport_pcm_init(&t->sco.mic_pcm, &t->thread, BA_TRANSPORT_PCM_MODE_SOURCE);
t->sco.mic_pcm.max_bt_volume = 15;

t->acquire = transport_acquire_bt_sco;
Expand Down Expand Up @@ -342,10 +351,11 @@ void ba_transport_destroy(struct ba_transport *t) {
}

/* If the transport is active, prior to releasing resources, we have to
* terminate the IO thread (or at least make sure it is not running any
* more). Not doing so might result in an undefined behavior or even a
* race condition (closed and reused file descriptor). */
* terminate the IO threads (or at least make sure they are not running
* any more). Not doing so might result in an undefined behavior or even
* a race condition (closed and reused file descriptor). */
transport_thread_cancel(&t->thread);
transport_thread_cancel(&t->thread_bc);

/* terminate on-going PCM connections - exit PCM controllers */
if (t->type.profile & BA_TRANSPORT_PROFILE_MASK_A2DP) {
Expand Down Expand Up @@ -399,6 +409,7 @@ void ba_transport_unref(struct ba_transport *t) {
}

transport_thread_free(&t->thread);
transport_thread_free(&t->thread_bc);

pthread_mutex_destroy(&t->mutex);
free(t->bluez_dbus_owner);
Expand Down Expand Up @@ -626,7 +637,8 @@ void ba_transport_set_codec(

int ba_transport_start(struct ba_transport *t) {

if (!pthread_equal(t->thread.id, config.main_thread))
if (!pthread_equal(t->thread.id, config.main_thread) ||
!pthread_equal(t->thread_bc.id, config.main_thread))
return 0;

debug("Starting transport: %s", ba_transport_type_to_string(t->type));
Expand All @@ -642,6 +654,7 @@ int ba_transport_start(struct ba_transport *t) {

int ba_transport_stop(struct ba_transport *t) {
transport_thread_cancel(&t->thread);
transport_thread_cancel(&t->thread_bc);
return 0;
}

Expand Down Expand Up @@ -728,27 +741,25 @@ int ba_transport_pcm_volume_update(struct ba_transport_pcm *pcm) {
}

int ba_transport_pcm_pause(struct ba_transport_pcm *pcm) {
ba_transport_thread_send_signal(&pcm->t->thread, BA_TRANSPORT_SIGNAL_PCM_PAUSE);
ba_transport_thread_send_signal(pcm->th, BA_TRANSPORT_SIGNAL_PCM_PAUSE);
debug("PCM paused: %d", pcm->fd);
return 0;
}

int ba_transport_pcm_resume(struct ba_transport_pcm *pcm) {
ba_transport_thread_send_signal(&pcm->t->thread, BA_TRANSPORT_SIGNAL_PCM_RESUME);
ba_transport_thread_send_signal(pcm->th, BA_TRANSPORT_SIGNAL_PCM_RESUME);
debug("PCM resumed: %d", pcm->fd);
return 0;
}

int ba_transport_pcm_drain(struct ba_transport_pcm *pcm) {

struct ba_transport *t = pcm->t;

if (pthread_equal(t->thread.id, config.main_thread))
if (pthread_equal(pcm->th->id, config.main_thread))
return errno = ESRCH, -1;

pthread_mutex_lock(&pcm->synced_mtx);

ba_transport_thread_send_signal(&t->thread, BA_TRANSPORT_SIGNAL_PCM_SYNC);
ba_transport_thread_send_signal(pcm->th, BA_TRANSPORT_SIGNAL_PCM_SYNC);
pthread_cond_wait(&pcm->synced, &pcm->synced_mtx);

pthread_mutex_unlock(&pcm->synced_mtx);
Expand Down Expand Up @@ -1034,16 +1045,14 @@ void ba_transport_thread_cleanup(struct ba_transport_thread *th) {
}

int ba_transport_thread_cleanup_lock(struct ba_transport_thread *th) {
struct ba_transport *t = th->t;
int ret = pthread_mutex_lock(&t->mutex);
t->cleanup_lock = true;
int ret = pthread_mutex_lock(&th->mutex);
th->cleanup_lock = true;
return ret;
}

int ba_transport_thread_cleanup_unlock(struct ba_transport_thread *th) {
struct ba_transport *t = th->t;
if (!t->cleanup_lock)
if (!th->cleanup_lock)
return 0;
t->cleanup_lock = false;
return pthread_mutex_unlock(&t->mutex);
th->cleanup_lock = false;
return pthread_mutex_unlock(&th->mutex);
}
11 changes: 8 additions & 3 deletions src/ba-transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ struct ba_transport_pcm {

/* backward reference to transport */
struct ba_transport *t;
/* associated transport thread */
struct ba_transport_thread *th;

/* PCM stream operation mode */
enum ba_transport_pcm_mode mode;
Expand Down Expand Up @@ -139,10 +141,14 @@ struct ba_transport_pcm {
struct ba_transport_thread {
/* backward reference to transport */
struct ba_transport *t;
/* guard PCM running on this thread */
pthread_mutex_t mutex;
/* actual thread ID */
pthread_t id;
/* notification PIPE */
int pipe[2];
/* indicates cleanup lock */
bool cleanup_lock;
};

struct ba_transport {
Expand Down Expand Up @@ -174,6 +180,8 @@ struct ba_transport {

/* main thread for audio processing */
struct ba_transport_thread thread;
/* thread for back-channel processing */
struct ba_transport_thread thread_bc;

union {

Expand Down Expand Up @@ -221,9 +229,6 @@ struct ba_transport {

};

/* indicates cleanup lock */
bool cleanup_lock;

/* callback functions for self-management */
int (*acquire)(struct ba_transport *);
int (*release)(struct ba_transport *);
Expand Down
5 changes: 2 additions & 3 deletions src/bluealsa-dbus.c
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ static gboolean bluealsa_pcm_controller(GIOChannel *ch, GIOCondition condition,
(void)condition;

struct ba_transport_pcm *pcm = (struct ba_transport_pcm *)userdata;
struct ba_transport *t = pcm->t;
char command[32];
size_t len;

Expand Down Expand Up @@ -327,7 +326,7 @@ static gboolean bluealsa_pcm_controller(GIOChannel *ch, GIOCondition condition,
return TRUE;
case G_IO_STATUS_EOF:
ba_transport_pcm_release(pcm);
ba_transport_thread_send_signal(&t->thread, BA_TRANSPORT_SIGNAL_PCM_CLOSE);
ba_transport_thread_send_signal(pcm->th, BA_TRANSPORT_SIGNAL_PCM_CLOSE);
/* remove channel from watch */
return FALSE;
}
Expand All @@ -340,7 +339,7 @@ static void bluealsa_pcm_open(GDBusMethodInvocation *inv) {
void *userdata = g_dbus_method_invocation_get_user_data(inv);
struct ba_transport_pcm *pcm = (struct ba_transport_pcm *)userdata;
const bool is_sink = pcm->mode == BA_TRANSPORT_PCM_MODE_SINK;
struct ba_transport_thread *th = &pcm->t->thread;
struct ba_transport_thread *th = pcm->th;
struct ba_transport *t = pcm->t;
int pcm_fds[4] = { -1, -1, -1, -1 };
bool locked = false;
Expand Down
6 changes: 4 additions & 2 deletions src/ofono.c
Original file line number Diff line number Diff line change
Expand Up @@ -401,12 +401,14 @@ static void ofono_agent_new_connection(GDBusMethodInvocation *inv) {
t->mtu_read = t->mtu_write = hci_sco_get_mtu(fd);

ba_transport_set_codec(t, codec);

bluealsa_dbus_pcm_update(&t->sco.spk_pcm,
BA_DBUS_PCM_UPDATE_SAMPLING | BA_DBUS_PCM_UPDATE_CODEC);
ba_transport_thread_send_signal(t->sco.spk_pcm.th, BA_TRANSPORT_SIGNAL_PING);

bluealsa_dbus_pcm_update(&t->sco.mic_pcm,
BA_DBUS_PCM_UPDATE_SAMPLING | BA_DBUS_PCM_UPDATE_CODEC);

ba_transport_thread_send_signal(&t->thread, BA_TRANSPORT_SIGNAL_PING);
ba_transport_thread_send_signal(t->sco.mic_pcm.th, BA_TRANSPORT_SIGNAL_PING);

g_dbus_method_invocation_return_value(inv, NULL);
goto final;
Expand Down
3 changes: 2 additions & 1 deletion src/sco.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ static void *sco_dispatcher_thread(struct ba_adapter *a) {
t->mtu_read = t->mtu_write = hci_sco_get_mtu(fd);
fd = -1;

ba_transport_thread_send_signal(&t->thread, BA_TRANSPORT_SIGNAL_PING);
ba_transport_thread_send_signal(t->sco.spk_pcm.th, BA_TRANSPORT_SIGNAL_PING);
ba_transport_thread_send_signal(t->sco.mic_pcm.th, BA_TRANSPORT_SIGNAL_PING);

cleanup:
if (d != NULL)
Expand Down

0 comments on commit 74b4d2f

Please sign in to comment.