-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
executable file
·280 lines (234 loc) · 8.06 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
import json
import argparse
from pathlib import Path
import os
import subprocess
import logging
from tempfile import TemporaryDirectory
import spacy
import argostranslate.package
import argostranslate.translate
import whisper
from pytube import YouTube
FILE_DELIM = ";"
TAGS = "#tags:{tags}"
AUDIO_FMT = "[sound:{fname}]"
AUDIO_BUFFER = 0.35
SPACY_MODELS = {"es": "es_core_news_sm", "it": 'it_core_news_sm'}
log = logging.getLogger()
def download_youtube_audio(video_src, out_dir) -> Path:
yt = YouTube(video_src)
audio_stream = yt.streams.get_audio_only()
title = "".join(filter(str.isalnum, yt.title))
file_name = audio_stream.download(
output_path=out_dir, filename=f"yt_audio_{title}.mp4"
)
log.info(f"Downloaded {yt.title} to {file_name}")
return Path(file_name)
def build_words_with_ts(data):
words_ts = []
for seg in data["segments"]:
for w in seg["words"]:
words_ts.append((w["word"].strip(), w["start"], w["end"]))
return words_ts
def build_sentence_start_end(doc, words_ts, audio_buffer):
"""
doc: spacy doc
words_ts: from build_words_with_ts (word, start, end)
return: (start, end, text)
"""
sentence_ts = []
word_idx = 0
end_word_idx = 0
for sent in doc.sents:
sent_words = sent.text.split(" ")
end_word_idx += len(sent_words)
try:
assert sent_words[0] == words_ts[word_idx][0]
except AssertionError:
log.error(
f"mismatched word - {word_idx=} {end_word_idx=} {sent_words[0]=} { words_ts[word_idx][0]=}"
)
raise
start = (
words_ts[word_idx][1] - audio_buffer
if words_ts[word_idx][1] > 0
else words_ts[word_idx][1]
)
end = words_ts[end_word_idx - 1][2] + audio_buffer
sentence_ts.append((start, end, sent.text))
word_idx = end_word_idx
return sentence_ts
def split_video_into_audio_seg(in_video, timestamps, out_name, out_dir):
"""
in_video: file to slice up
timestamps: tuple (start, end) as str
out_name: base name for the clips
out_dir: dir to save slices in
"""
if not os.path.exists(out_dir):
os.makedirs(out_dir)
created_files = []
for (n, (s, e)) in enumerate(timestamps):
fname = f"{out_name}-{n}-clip.mp4"
args = [
"ffmpeg",
"-ss",
s,
"-to",
e,
"-i",
in_video,
"-map",
"0:a",
"-y",
f"{out_dir}/{fname}",
]
subprocess.run(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
created_files.append(fname)
return created_files
def translate_setup(from_code, to_code):
argostranslate.package.update_package_index()
available_packages = argostranslate.package.get_available_packages()
package_to_install = next(
filter(
lambda x: x.from_code == from_code and x.to_code == to_code,
available_packages,
)
)
argostranslate.package.install_from_path(package_to_install.download())
def translate_sentences(sents, from_code, to_code):
return [argostranslate.translate.translate(s, from_code, to_code) for s in sents]
def write_anki_import(file_path, sentences, translation, audio_files, tags=None):
assert len(sentences) == len(translation) == len(audio_files)
with open(file_path, "w", encoding="utf-8") as f:
if tags:
f.write(TAGS.format(tags=tags) + "\n")
for i, audio in enumerate(audio_files):
a = AUDIO_FMT.format(fname=audio)
f.write(f"{a}{FILE_DELIM}{sentences[i]}<br>{translation[i]}\n")
def transcribe(in_file, lang_code):
model = whisper.load_model("small")
result = model.transcribe(in_file, word_timestamps=True, **{"language": lang_code})
return result
def main(
source: str,
input_language: str,
output_language: str,
audio_save_dir: Path,
json_transcribe_file: Path,
audio_buffer: float,
save_json_file: Path,
):
"""
source_file: the file to process
input_language, output_language: the input anf output language codes
audio_save_dir : where to save the audio clips
json_transcribe_file: optional input file to use instead of doing transcribe with this script, needs word timestamps
audio_buffer amount of time to add onto audio clips start and end
save_json_file: location to save transcript json file after transcribe
"""
if "youtube" in source and source.startswith("http"):
tmp_dir = TemporaryDirectory()
source_file = download_youtube_audio(source, tmp_dir.name)
else:
source_file = Path(source)
# assert all files exist
assert source_file.exists()
assert audio_save_dir.exists() and audio_save_dir.is_dir()
if json_transcribe_file:
assert json_transcribe_file.exists()
if save_json_file:
assert not save_json_file.exists()
spacy_model = SPACY_MODELS.get(input_language)
base_name = source_file.parts[-1].split(".")[0]
spacy.prefer_gpu()
es_nlp = spacy.load(spacy_model, exclude=["parser"])
es_nlp.enable_pipe("senter")
log.info("spacy loaded...")
translate_setup(input_language, output_language)
if json_transcribe_file:
with open(json_transcribe_file, "r", encoding="utf-8") as f:
data = json.load(f)
log.info("whisper transcribe loaded")
else:
log.info("start whisper transcribe")
data = transcribe(str(source_file), input_language)
log.info("whisper transcribe done")
if save_json_file:
with open(save_json_file, "w", encoding="utf-8") as f:
json.dump(data, f)
words_ts = build_words_with_ts(data)
log.info("built sentence ts data...")
src_txt = " ".join([w[0] for w in words_ts])
doc = es_nlp(src_txt)
sentence_ts = build_sentence_start_end(doc, words_ts, audio_buffer)
_ts = [(str(s), str(e)) for (s, e, _) in sentence_ts]
log.info("starting file split")
created_files = split_video_into_audio_seg(
source_file, _ts, base_name, audio_save_dir
)
sents = [t for (_s, _e, t) in sentence_ts]
log.info("audio_created")
log.info("starting translation")
translations = translate_sentences(sents, input_language, output_language)
log.info(f"writing anki to ./{base_name}.txt")
write_anki_import(f"./{base_name}.txt", sents, translations, created_files)
log.info("done")
def setup_logging():
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(message)s",
datefmt="%H:%M:%S",
)
if __name__ == "__main__":
setup_logging()
log.info("starting...")
parser = argparse.ArgumentParser()
parser.add_argument(
"source",
help="The full path to the video to process or Youtube video to download",
type=str,
)
parser.add_argument(
"--input-language",
"-i",
help=f"Language code for input language ({'.'.join(SPACY_MODELS.keys())})",
default="es",
type=str,
)
parser.add_argument(
"--output-language",
"-o",
help=f"Language code for output language ({'.'.join(SPACY_MODELS.keys())})",
default="en",
type=str,
)
parser.add_argument(
"--audio-save-dir",
"-s",
help="Directory to save audio clips in",
default="./out",
type=Path,
)
parser.add_argument(
"--json-transcribe-file",
"-j",
help="Path to load whisper's json transcription file, will skip doing a new transcription",
type=Path,
)
parser.add_argument(
"--audio-buffer",
"-b",
help="Time to add on to start and end of audio sentence clips",
default=AUDIO_BUFFER,
type=float,
)
parser.add_argument(
"--save-json-file",
"-w",
help="location to save whisper's json data from transcription, can be used to reload and avoid rerunning transcription",
type=Path,
)
args = parser.parse_args()
main(**args.__dict__)