Skip to content

Commit

Permalink
echo watermark (#1)
Browse files Browse the repository at this point in the history
* add: echo watermark for wav
  • Loading branch information
guofei9987 authored Dec 12, 2023
1 parent cd380ef commit 0919ac5
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 152 deletions.
19 changes: 10 additions & 9 deletions example/example_echo_watermark.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,34 @@
from hide_info.echo_watermark import EchoWatermark, get_error_rate, get_snr
from hide_info import utils

ori_file = "音乐.wav"
embedded_file = "打入水印_音乐.wav" # 嵌入了水印的文件
wm_str = "回声水印算法测试用水印"
ori_file = "sounds.wav"
embedded_file = "sounds_with_watermark.wav"
wm_str = "回声水印算法,欢迎 star!"

wm_bits = [int(i) for i in utils.bytes2bin(wm_str.encode('utf-8'))]
wm_bits = utils.bytes2bin(wm_str.encode('utf-8'))

len_wm_bits = len(wm_bits)

# embed:
echo_wm = EchoWatermark(pwd=111001, algo_type=2)
echo_wm = EchoWatermark(pwd=111001)
echo_wm.embed(origin_filename=ori_file, wm_bits=wm_bits, embed_filename=embedded_file)

# extract:
echo_wm = EchoWatermark(pwd=111001, algo_type=2)
echo_wm = EchoWatermark(pwd=111001)
wm_extract = echo_wm.extract(embed_filename=embedded_file, len_wm_bits=len_wm_bits)

wm_str_extract = utils.bin2bytes(''.join([str(int(i)) for i in wm_extract])).decode('utf-8')
wm_str_extract = utils.bin2bytes(wm_extract).decode('utf-8', errors='replace')
print("解出水印:", wm_str_extract)
get_error_rate(wm_extract, wm_bits)

# %%
# %% There are 3 algorithms:
wm_bits = np.random.randint(2, size=200).tolist()
len_wm_bits = len(wm_bits)

for algo_type in [1, 2, 3]:
echo_wm = EchoWatermark(pwd=111001, algo_type=algo_type)
echo_wm.embed(origin_filename=ori_file, wm_bits=wm_bits, embed_filename=embedded_file)
wm_extract = echo_wm.extract(embed_filename=embedded_file, len_wm_bits=len_wm_bits)
get_error_rate(wm_extract, wm_bits)
error_rate = get_error_rate(wm_extract, wm_bits)
get_snr(embedded_file, ori_file)
assert error_rate <= 0.03
4 changes: 2 additions & 2 deletions example/example_hide_in_music.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

text = "待嵌入到音乐文件的文本,下面的代码中,会把这段文本以二进制形式隐藏到一个音乐文件中"

hide_in_music.encode(text.encode('utf-8'), music_filename="音乐.wav", music_filename_new="藏文于音.wav")
hide_in_music.encode(text.encode('utf-8'), music_filename="sounds.wav", music_filename_new="藏文于音.wav")

text_encode = hide_in_music.decode(music_filename="藏文于音.wav")

print(text_encode.decode('utf-8'))

# %%
# 把文件隐藏到某个音乐中
hide_in_music.file_encode(filename='要隐藏的文件.zip', music_filename="音乐.wav", music_filename_new="藏物于音.wav")
hide_in_music.file_encode(filename='要隐藏的文件.zip', music_filename="sounds.wav", music_filename_new="藏物于音.wav")
# 从音乐中提取文件
hide_in_music.file_decode(filename="藏物于音-解出的文件.zip", music_filename="藏物于音.wav")

Expand Down
File renamed without changes.
230 changes: 92 additions & 138 deletions hide_info/echo_watermark.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,43 @@
# 回声音频水印
import numpy as np
from scipy.io import wavfile
from scipy.signal import windows
from scipy.signal.windows import hann
from numpy.fft import ifft, fft


class EchoWatermark:
def __init__(self, pwd, algo_type=2, verbose=False):
def __init__(self, pwd, algo_type=3, verbose=False):
self.pwd = pwd
self.algo_type = algo_type
self.verbose = verbose

self.FRAME_LENGTH = 2048 # 帧长度
self.CONTROL_STRENGTH = 0.2 # 嵌入强度
self.OVERLAP = 0.5 # 帧分析的重叠率
self.NEGATIVE_DELAY = 4 # negative delay, for negative echo
self.LOG_FLOOR = 0.00001
self.frame_len = 2048 # 帧长度
self.echo_amplitude = 0.2 # 回声幅度
self.overlap = 0.5 # 帧分析的重叠率
self.neg_delay = 4 # negative delay, for negative echo

# 回声参数
# for key 1
# pwd[i] = 1
self.delay11, self.delay10 = 100, 110
# for key 0
# pwd[i] = 0
self.delay01, self.delay00 = 120, 130

def embed(self, origin_filename,wm_bits, embed_filename):
pwd = self.pwd
algo_type = self.algo_type
FRAME_LENGTH = self.FRAME_LENGTH
CONTROL_STRENGTH = self.CONTROL_STRENGTH
OVERLAP = self.OVERLAP
NEGATIVE_DELAY = self.NEGATIVE_DELAY
LOG_FLOOR = self.LOG_FLOOR
delay11, delay10, delay01, delay00 = self.delay11, self.delay10, self.delay01, self.delay00
def embed(self, origin_filename, wm_bits, embed_filename):
frame_len = self.frame_len
echo_amplitude = self.echo_amplitude
overlap = self.overlap
neg_delay = self.neg_delay

delay_matrix = [[self.delay00, self.delay01], [self.delay10, self.delay11]]

sr, host_signal = wavfile.read(origin_filename)
signal_len = len(host_signal)
sr, ori_signal = wavfile.read(origin_filename)
signal_len = len(ori_signal)

# 帧的移动量
frame_shift = int(FRAME_LENGTH * (1 - OVERLAP))
frame_shift = int(frame_len * (1 - overlap))

# 和相邻帧的重叠长度
overlap_length = int(FRAME_LENGTH * OVERLAP)
overlap_length = int(frame_len * overlap)

# 可嵌入总比特数
embed_nbit_ = (signal_len - overlap_length) // frame_shift
Expand All @@ -57,81 +55,62 @@ def embed(self, origin_filename,wm_bits, embed_filename):
f"可以嵌入的总比特数为: {embed_nbit_},水印长度为{len(wm_bits)},重复嵌入 {n_repeat} 次, 实际嵌入{embed_nbit}")

# 扩展水印信号
wmark_extended = np.repeat(wm_bits, n_repeat)
wm_repeat = np.repeat(wm_bits, n_repeat)

# 生成密钥
np.random.seed(pwd)
np.random.seed(self.pwd)
secret_key = np.random.randint(2, size=int(len_wm_bits))
secret_key_extended = np.repeat(secret_key, n_repeat)

pointer = 0
echoed_signal1 = np.zeros((frame_shift * embed_nbit))
prev1 = np.zeros((FRAME_LENGTH))
de = NEGATIVE_DELAY #
echoed_signal = np.zeros((frame_shift * embed_nbit))
prev1 = np.zeros(frame_len)

for i in range(embed_nbit):
frame = host_signal[pointer: (pointer + FRAME_LENGTH)]
frame = ori_signal[pointer: (pointer + frame_len)]

if secret_key_extended[i] == 1:
if wmark_extended[i] == 1:
delay = delay11
else:
delay = delay10
else:
if wmark_extended[i] == 1:
delay = delay01
else:
delay = delay00

echo_positive = CONTROL_STRENGTH \
* np.concatenate((np.zeros(delay),
frame[0:FRAME_LENGTH - delay]))

echo_negative = - CONTROL_STRENGTH \
* np.concatenate((np.zeros(delay + de),
frame[0:FRAME_LENGTH - delay - de]))

echo_forward = CONTROL_STRENGTH \
* np.concatenate((frame[delay:FRAME_LENGTH], np.zeros(delay)))

if algo_type == 1:
echoed_frame1 = frame + echo_positive
elif algo_type == 2:
echoed_frame1 = frame + echo_positive + echo_negative
else: # algo_type == 3
echoed_frame1 = frame + echo_positive + echo_forward
delay = delay_matrix[secret_key_extended[i]][wm_repeat[i]]

echoed_frame1 = echoed_frame1 * windows.hann(FRAME_LENGTH)
echoed_signal1[frame_shift * i: frame_shift * (i + 1)] = \
np.concatenate((prev1[frame_shift:FRAME_LENGTH] +
echoed_frame1[0:overlap_length],
echoed_frame1[overlap_length:frame_shift]))
echo_positive = np.concatenate((np.zeros(delay), frame[0:frame_len - delay]))

prev1 = echoed_frame1
pointer = pointer + frame_shift
echo_negative = - np.concatenate((np.zeros(delay + neg_delay),
frame[0:frame_len - delay - neg_delay]))

echo_forward = np.concatenate((frame[delay:frame_len], np.zeros(delay)))

echoed_signal1 = np.concatenate(
(echoed_signal1, host_signal[len(echoed_signal1): signal_len]))
if self.algo_type == 1:
echoed_frame = frame + echo_amplitude * echo_positive
elif self.algo_type == 2:
echoed_frame = frame + echo_amplitude * (echo_positive + echo_negative)
else: # algo_type == 3
echoed_frame = frame + echo_amplitude * (echo_positive + echo_forward)

echoed_frame = echoed_frame * hann(frame_len)
echoed_signal[frame_shift * i: frame_shift * (i + 1)] = \
np.concatenate((prev1[frame_shift:frame_len] +
echoed_frame[0:overlap_length],
echoed_frame[overlap_length:frame_shift]))

# 将保存为wav格式
echoed_signal1 = echoed_signal1.astype(np.int16)
wavfile.write(embed_filename, sr, echoed_signal1)
prev1 = echoed_frame
pointer += frame_shift

echoed_signal = np.concatenate((echoed_signal, ori_signal[len(echoed_signal):])).astype(np.int16)
# 保存为wav格式
wavfile.write(embed_filename, sr, echoed_signal)

def extract(self, embed_filename, len_wm_bits):
pwd = self.pwd
algo_type = self.algo_type
FRAME_LENGTH = self.FRAME_LENGTH
CONTROL_STRENGTH = self.CONTROL_STRENGTH
OVERLAP = self.OVERLAP
NEGATIVE_DELAY = self.NEGATIVE_DELAY
LOG_FLOOR = self.LOG_FLOOR
frame_len = self.frame_len
overlap = self.overlap
neg_delay = self.neg_delay
delay11, delay10, delay01, delay00 = self.delay11, self.delay10, self.delay01, self.delay00
log_floor = 0.00001 # 取对数时的最小值

# 打开已嵌入水印的音频文件
_, eval_signal1 = wavfile.read(embed_filename)
signal_len = len(eval_signal1)
_, wm_signal = wavfile.read(embed_filename)
signal_len = len(wm_signal)

frame_shift = int(FRAME_LENGTH * (1 - OVERLAP))
embed_nbit_ = (signal_len - int(FRAME_LENGTH * OVERLAP)) // frame_shift
frame_shift = int(frame_len * (1 - overlap))
embed_nbit_ = (signal_len - int(frame_len * overlap)) // frame_shift

# 重复次数
n_repeat = embed_nbit_ // len_wm_bits
Expand All @@ -144,92 +123,67 @@ def extract(self, embed_filename, len_wm_bits):
f"可以嵌入的总比特数为: {embed_nbit_},水印长度为{len_wm_bits},重复嵌入 {n_repeat} 次, 实际嵌入{embed_nbit}")

# 加载密钥
np.random.seed(pwd)
np.random.seed(self.pwd)
secret_key = np.random.randint(2, size=int(len_wm_bits))
secret_key = np.repeat(secret_key, n_repeat)

pointer = 0
detected_bit1 = np.zeros(embed_nbit)
for i in range(embed_nbit):
wmarked_frame1 = eval_signal1[pointer: pointer + FRAME_LENGTH]
ceps1 = np.fft.ifft(
np.log(np.square(np.fft.fft(wmarked_frame1)) + LOG_FLOOR)).real
wmarked_frame1 = wm_signal[pointer: pointer + frame_len]
ceps1 = ifft(
np.log(np.square(fft(wmarked_frame1)) + log_floor)).real

if secret_key[i] == 1:
if algo_type == 1:
if ceps1[delay11] > ceps1[delay10]:
detected_bit1[i] = 1
else:
detected_bit1[i] = 0
elif algo_type == 2:
if (ceps1[delay11] - ceps1[delay11 + NEGATIVE_DELAY]) > \
(ceps1[delay10] - ceps1[delay10 + NEGATIVE_DELAY]):
detected_bit1[i] = 1
else:
detected_bit1[i] = 0
else: # algo_type == 3
if ceps1[delay11] > ceps1[delay10]:
detected_bit1[i] = 1
else:
detected_bit1[i] = 0

delay0, delay1 = delay10, delay11
else:
if algo_type == 1:
if ceps1[delay01] > ceps1[delay00]:
detected_bit1[i] = 1
else:
detected_bit1[i] = 0
elif algo_type == 2:
if (ceps1[delay01] - ceps1[delay01 + NEGATIVE_DELAY]) > \
(ceps1[delay00] - ceps1[delay00 + NEGATIVE_DELAY]):
detected_bit1[i] = 1
else:
detected_bit1[i] = 0
else:
if ceps1[delay01] > ceps1[delay00]:
detected_bit1[i] = 1
else:
detected_bit1[i] = 0
delay0, delay1 = delay00, delay01

if self.algo_type == 1:
if ceps1[delay1] > ceps1[delay0]:
detected_bit1[i] = 1
elif self.algo_type == 2:
if (ceps1[delay1] - ceps1[delay1 + neg_delay]) > \
(ceps1[delay0] - ceps1[delay0 + neg_delay]):
detected_bit1[i] = 1
else: # algo_type == 3
if ceps1[delay1] > ceps1[delay0]:
detected_bit1[i] = 1

pointer = pointer + frame_shift

count = 0
wmark_recovered1 = np.zeros(len_wm_bits)
# wmark_recovered2 = np.zeros(len_wm_bits)
# wmark_recovered3 = np.zeros(len_wm_bits)
wm_extract = np.zeros(len_wm_bits)

for i in range(len_wm_bits):

# 汇总比特值(平均值)
ave = np.sum(detected_bit1[count:count + n_repeat]) / n_repeat
# 汇总比特值(按平均值)
ave = np.average(detected_bit1[count:count + n_repeat])
if ave >= 0.5:
wmark_recovered1[i] = 1
wm_extract[i] = 1
else:
wmark_recovered1[i] = 0
wm_extract[i] = 0

count = count + n_repeat

return wmark_recovered1
return wm_extract


def get_error_rate(wmark_recovered, wm_bits):
def get_error_rate(wm_extract, wm_bits):
# 计算错误率
len_wm_bits = len(wm_bits)
denom = np.sum(np.abs(wmark_recovered - wm_bits))
BER = np.sum(np.abs(wmark_recovered - wm_bits)) / \
len_wm_bits * 100
print(f'bit error rate = {BER:.2f}% ({denom} / {len_wm_bits})')
return BER
error_num = np.sum(np.abs(wm_extract - wm_bits))
error_rate = error_num / len_wm_bits
print(f'bit error rate = {error_rate:.2%} ({error_num} / {len_wm_bits})')
return error_rate


def get_snr(wav_with_wm, orig_file):
sr, host_signal = wavfile.read(orig_file)
_, eval_signal1 = wavfile.read(wav_with_wm)
def get_snr(file_with_wm, orig_file):
sr, ori_signal = wavfile.read(orig_file)
_, wm_signal = wavfile.read(file_with_wm)

SNR = 10 * np.log10(
np.sum(np.square(host_signal.astype(np.float32)))
/ np.sum(np.square(host_signal.astype(np.float32)
- eval_signal1.astype(np.float32))))
np.sum(np.square(ori_signal.astype(np.float32)))
/ np.sum(np.square(ori_signal.astype(np.float32)
- wm_signal.astype(np.float32))))
print(f'SNR = {SNR:.2f} dB')
return SNR

Loading

0 comments on commit 0919ac5

Please sign in to comment.