Skip to content

Commit

Permalink
Merge branch 'gladia' of github.com:bigbluebutton/bbb-transcription-c…
Browse files Browse the repository at this point in the history
…ontroller into main
  • Loading branch information
lfzawacki committed Nov 24, 2023
2 parents 6d5c44f + a170c1e commit 4e2714d
Show file tree
Hide file tree
Showing 8 changed files with 371 additions and 108 deletions.
237 changes: 159 additions & 78 deletions app.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,45 @@
'use strict'

function tryParseJson(str) {
try {
return JSON.parse(str);
} catch (ex) {
return {};
}
}
const Logger = require('./lib/logger');

const config = require('config');
const EventEmitter = require('events').EventEmitter;
const Logger = require('./lib/logger');
const LOG_PREFIX = "[bbb-transcript-manager]";
const { fork } = require('child_process');
const fs = require('fs');

let GLADIA_PROXY_PROCESS;
const runGladiaProxy = () => {
const outputFile = 'gladia-proxy.log';

const outputStream = fs.createWriteStream(outputFile);

outputStream.on('open', () => {
// Spawn the child process
GLADIA_PROXY_PROCESS = fork('gladia-proxy.js', [], {
stdio: [null, outputStream, outputStream, 'ipc']
});

GLADIA_PROXY_PROCESS.on('exit', (code, signal) => {
Logger.info(`Closing Gladia proxy code: ${code} signal: ${signal}`);
});
});

Logger.info("Starting Gladia proxy");
}

if (config.get('gladia.proxy.enabled')) {
runGladiaProxy();
}

const { tryParseJSON } = require('./lib/utils');

const EventEmitter = require('events').EventEmitter;
const C = require('./lib/Constants');
const BigBlueButtonGW = require('./lib/bbb-gw');

const bbbGW = new BigBlueButtonGW();

const socketStatus = {};
const socketIsStopping = {};
const userChannels = {};

const REDIS_CHANNEL = config.get('redis.publishChannel')

Expand All @@ -32,10 +52,41 @@ bbbGW.on('UserSpeechLocaleChangedEvtMsg', (header, payload) => {
const { meetingId, userId } = header;
const { provider, locale } = payload;

console.log("Speech changed", userId, provider, locale);
Logger.info("Speech changed " + userId + ' ' + provider + ' ' + locale);

setProvider(userId, provider, () => {
setUserLocale(userId, locale, () => {

let channelId = userChannels[userId];
if (channelId && socketStatus[channelId]) {
stopAudioFork(channelId);
setTimeout(() => {
startAudioFork(channelId, userId);
}, 1000);
}
});
});
});

bbbGW.on('UserSpeechOptionsChangedEvtMsg', (header, payload) => {
const { meetingId, userId } = header;
const { partialUtterances, minUtteranceLength } = payload;

Logger.info("User speech options changed " + ' ' + meetingId + ' ' + userId + ' ' + partialUtterances + ' ' + minUtteranceLength);

setUserPartialUtterance(userId, partialUtterances, () => {
setUserMinUtteranceLength(userId, minUtteranceLength, () => {

let channelId = userChannels[userId];
if (channelId && socketStatus[channelId]) {
stopAudioFork(channelId);
setTimeout(() => {
startAudioFork(channelId, userId);
}, 1000);
}

setProvider(userId, provider);
setUserLocale(userId, locale);
});
});
});

const REDIS_VOICE_ID_KEY = 'bbb-transcription-manager_voiceToMeeting';
Expand All @@ -56,6 +107,24 @@ const setUserLocale = (userId, locale, cb) => {
bbbGW.setKey(REDIS_USER_LOCALE_KEY + '_' + userId, locale, cb);
};

const REDIS_USER_PARTIAL_UTTERANCE_KEY = 'bbb-transcription-manager_partial_utterance';
const getUserPartialUtterance = (userId, cb) => {
bbbGW.getKey(REDIS_USER_PARTIAL_UTTERANCE_KEY + '_' + userId, cb);
};

const setUserPartialUtterance = (userId, partialUtterance, cb) => {
bbbGW.setKey(REDIS_USER_PARTIAL_UTTERANCE_KEY + '_' + userId, partialUtterance, cb);
};

const REDIS_USER_MIN_UTTERANCE_LENGTH_KEY = 'bbb-transcription-manager_min_utterance_length';
const getUserMinUtteranceLength = (userId, cb) => {
bbbGW.getKey(REDIS_USER_MIN_UTTERANCE_LENGTH_KEY + '_' + userId, cb);
};

const setUserMinUtteranceLength = (userId, minUtteranceLength, cb) => {
bbbGW.setKey(REDIS_USER_MIN_UTTERANCE_LENGTH_KEY + '_' + userId, minUtteranceLength, cb);
};

const REDIS_TRANSCRIPTION_PROVIDER_KEY = 'bbb-transcription-manager_provider';
const getProvider = (userId, cb) => {
bbbGW.getKey(REDIS_TRANSCRIPTION_PROVIDER_KEY + '_' + userId, cb);
Expand All @@ -70,13 +139,17 @@ const eslWrapper = new EslWrapper();

const SAMPLE_RATE = config.get("sampleRate");

const INCLUDE_PARTIAL_RESULTS = config.get("includePartialResults");

const getServerUrl = (userId, cb) => {

getProvider(userId, (err, provider) => {
getUserLocale(userId, (err, locale) => {

if (provider && provider != '' && config.has(provider) && locale && locale != '') {
return cb(config.get(provider + '.servers.' + locale), provider);
if (provider && provider != '' && locale && locale != '') {
const serverUrl = config.get(provider === 'gladia' ? 'gladia.server' : provider + '.servers.' + locale);

return cb(serverUrl, provider, locale);
} else {
return cb(null);
}
Expand Down Expand Up @@ -114,94 +187,91 @@ const makeMessage = (meetingId, userId, locale, transcript, result) => {
};

const startAudioFork = (channelId, userId) => {
getServerUrl(userId, (serverUrl, provider) => {
if (!serverUrl) {
Logger.warn("No provider set, not transcribing");
return;
}

const initialMessage = JSON.parse(config.get(provider + '.startMessage'));
if (provider === 'vosk') {
initialMessage.config.sample_rate = SAMPLE_RATE + '000';
}

if (socketIsStopping[channelId]) {
socketIsStopping[channelId] = false;
}

if (!socketStatus[channelId]) {
eslWrapper._executeCommand(`uuid_audio_fork ${channelId} start ${serverUrl} mono ${SAMPLE_RATE}k ${JSON.stringify(initialMessage)}`);
socketStatus[channelId] = true;
}
Logger.info(`Start mod_audio_fork connection ${channelId} ${userId}`);

getServerUrl(userId, (serverUrl, provider, language) => {
getUserPartialUtterance(userId, (err, partialUtterances) => {
getUserMinUtteranceLength(userId, (err, minUtteranceLength) => {

if (!serverUrl) {
Logger.warn("No provider set, not transcribing");
return;
}

const initialMessage = JSON.parse(config.get(provider + '.startMessage'));

if (provider === 'vosk') {
initialMessage.config.sample_rate = SAMPLE_RATE + '000';
}

if (provider === 'gladia') {
initialMessage.sample_rate = parseInt(SAMPLE_RATE + '000')
initialMessage.language = language.slice(0,2);
initialMessage.partialUtterances = partialUtterances;
initialMessage.minUtteranceLength = minUtteranceLength;
}

if (!socketStatus[channelId]) {
eslWrapper._executeCommand(`uuid_audio_fork ${channelId} start ${serverUrl} mono ${SAMPLE_RATE}k ${JSON.stringify(initialMessage)}`);
socketStatus[channelId] = true;
userChannels[userId] = channelId;
}
});
});
});
};

const stopAudioFork = (channelId, userId) => {
getProvider(userId, (err, provider) => {

let endMessage;

if (!provider) {
Logger.warn("No provider set, not stopping transcription");
endMessage = JSON.parse(config.get('vosk.endMessage'));
} else {
endMessage = JSON.parse(config.get(provider + '.endMessage'));
}

if (socketStatus[channelId]) {
if (!socketIsStopping[channelId]) {
socketIsStopping[channelId] = true;
} else {
eslWrapper._executeCommand(`uuid_audio_fork ${channelId} stop ${JSON.stringify(endMessage)}`);
const stopAudioFork = (channelId) => {
Logger.info(`Stop mod_audio_fork connection ${channelId}`);
const endMessage = JSON.parse(config.get('vosk.endMessage'));

socketStatus[channelId] = false;
socketIsStopping[channelId] = false;
}
if (socketStatus[channelId]) {
try{
eslWrapper._executeCommand(`uuid_audio_fork ${channelId} stop ${JSON.stringify(endMessage)}`);
} catch (e) {
logger.error("Socket already closed");
}
});
socketStatus[channelId] = false;
}
};

let prev_transcription = '';
eslWrapper.onModAudioForkJSON((msg) => {
const channelId = msg.getHeader('Channel-Call-UUID')
eslWrapper.onModAudioForkJSON((msg, channelId, userId) => {

getVoiceToMeeting(msg.getHeader('variable_conference_name'), (err, meetingId) => {

const userId = msg.getHeader('Caller-Username').split('_').slice(0,2).join('_');
getUserLocale(userId, (err, locale) => {
const ignore = [ '', 'the']

const body = tryParseJson(msg.body);
const body = tryParseJSON(msg.body);
const transcription = body.text || body.partial;

if (body.text) {
Logger.info(`Final text is: ${body.text}`);
if (body.partial && !INCLUDE_PARTIAL_RESULTS) {
Logger.debug('Discard partial utterance', body.partial);
return;
}

if ((ignore.includes(transcription) || transcription == prev_transcription) && !body.text) {
return;
if (body.text) {
Logger.info(`Final text is: ${body.text}`);
}

prev_transcription = transcription;
const result = Boolean(body.text);
const payload = makeMessage(meetingId, userId, locale, transcription, result);
const payload = makeMessage(meetingId, userId, body.locale || locale, transcription, result);

bbbGW.publish(JSON.stringify(payload), C.TO_AKKA_APPS_CHAN_2x);

if (socketIsStopping[channelId] && result) {
stopAudioFork(channelId);
}
});
});
});

});
eslWrapper.onModAudioForkDisconnect((msg, channelId, userId) => {
Logger.info(`mod_audio_fork connection dropped ${channelId} ${userId}`);
});

const handleChannelAnswer = (channelId, callId) => {
Logger.info(`FS: Associating channel ${channelId} ${callId}`);
const handleChannelAnswer = (channelId, callId, userId) => {
Logger.info(`FS: Associating channel ${channelId} ${callId} userId: ${userId}`);
startAudioFork(channelId, userId);
}

const handleChannelHangup = (channelId, callId) => {
Logger.info(`FS: channel hangup ${channelId} ${callId}`);
stopAudioFork(channelId);
}

const handleFloorChanged = (roomId, newFloorMemberId) => {
Expand All @@ -210,12 +280,10 @@ const handleFloorChanged = (roomId, newFloorMemberId) => {

const handleStartTalking = (channelId, userId) => {
Logger.info(`FS: Start talking ${channelId} userId: ${userId}`);
startAudioFork(channelId, userId);
}

const handleStopTalking = (channelId, userId) => {
Logger.info(`FS: Stop Talking ${channelId} userId: ${userId}`);
stopAudioFork(channelId, userId);
}

eslWrapper.on(EslWrapper.EVENTS.CHANNEL_ANSWER, handleChannelAnswer);
Expand All @@ -227,3 +295,16 @@ eslWrapper.on(EslWrapper.EVENTS.MUTED, handleStopTalking);

eslWrapper._connect();

const exitCleanup = () => {
Logger.info('Closing process, cleaning up.');

if (GLADIA_PROXY_PROCESS) {
Logger.info('Killing gladia proxy');
GLADIA_PROXY_PROCESS.kill('SIGINT');
}
setTimeout(() => process.exit(), 1000);
}

process.on('SIGINT', exitCleanup);
process.on('SIGQUIT', exitCleanup);
process.on('SIGTERM', exitCleanup);
14 changes: 13 additions & 1 deletion config/default.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ redis:
# password: foobared
publishChannel: 'from-akka-apps-redis-channel'

includePartialResults: true

# in mhz
# Valid values are '16' and '8', but 8 seems to be
# too low to get accurate transcriptions
Expand All @@ -23,7 +25,7 @@ sampleRate: '16'
# language, see the example below

vosk:
startMessage: '{"config": { "sample_rate": ""} }'
startMessage: '{"config": { "sample_rate": "" } }'
endMessage: '{"eof" : 1 }'
servers:
en-US: wss://HOST/voskEN
Expand All @@ -34,6 +36,16 @@ vosk:
# TODO: whispering is still not supported, need to get
# the start/end messages right

gladia:
startMessage: '{"x_gladia_key": "", "sample_rate": 0, "bit_depth": 16, "model_type": "accurate" }'
endMessage: '{}'
server: ws://localhost:8777
proxy:
enabled: true
address: "wss://api.gladia.io/audio/text/audio-transcription"
minPartialDuration: 3
languageManual: true

whispering:
startMessage: '{"config": { "sample_rate": ""} }'
endMessage: '{"eof" : 1 }'
Expand Down
Loading

0 comments on commit 4e2714d

Please sign in to comment.