3
3
# import sounddevice as sd
4
4
import torch
5
5
import numpy as np
6
+ import datetime
6
7
7
8
9
+ def int2float (sound ):
10
+ abs_max = np .abs (sound ).max ()
11
+ sound = sound .astype ('float32' )
12
+ if abs_max > 0 :
13
+ sound *= 1 / 32768
14
+ sound = sound .squeeze () # depends on the use case
15
+ return sound
16
+
8
17
class VoiceActivityController :
9
18
def __init__ (
10
19
self ,
11
20
sampling_rate = 16000 ,
12
- second_ofSilence = 0.5 ,
13
- second_ofSpeech = 0.25 ,
21
+ min_silence_to_final_ms = 500 ,
22
+ min_speech_to_final_ms = 100 ,
23
+ min_silence_duration_ms = 100 ,
14
24
use_vad_result = True ,
15
25
activity_detected_callback = None ,
26
+ threshold = 0.3
16
27
):
17
28
self .activity_detected_callback = activity_detected_callback
18
29
self .model , self .utils = torch .hub .load (
@@ -26,84 +37,77 @@ def __init__(
26
37
collect_chunks ) = self .utils
27
38
28
39
self .sampling_rate = sampling_rate
29
- self .silence_limit = second_ofSilence * self .sampling_rate
30
- self .speech_limit = second_ofSpeech * self .sampling_rate
40
+ self .final_silence_limit = min_silence_to_final_ms * self .sampling_rate / 1000
41
+ self .final_speech_limit = min_speech_to_final_ms * self .sampling_rate / 1000
42
+ self .min_silence_samples = sampling_rate * min_silence_duration_ms / 1000
31
43
32
44
self .use_vad_result = use_vad_result
33
- self .vad_iterator = VADIterator (
34
- model = self .model ,
35
- threshold = 0.3 , # 0.5
36
- sampling_rate = self .sampling_rate ,
37
- min_silence_duration_ms = 500 , #100
38
- speech_pad_ms = 400 #30
39
- )
40
45
self .last_marked_chunk = None
41
-
42
-
43
- def int2float (self , sound ):
44
- abs_max = np .abs (sound ).max ()
45
- sound = sound .astype ('float32' )
46
- if abs_max > 0 :
47
- sound *= 1 / 32768
48
- sound = sound .squeeze () # depends on the use case
49
- return sound
46
+ self .threshold = threshold
47
+ self .reset_states ()
48
+
49
+ def reset_states (self ):
50
+ self .model .reset_states ()
51
+ self .temp_end = 0
52
+ self .current_sample = 0
50
53
51
54
def apply_vad (self , audio ):
52
- audio_float32 = self .int2float (audio )
53
- chunk = self .vad_iterator (audio_float32 , return_seconds = False )
54
-
55
- if chunk is not None :
56
- if "start" in chunk :
57
- start = chunk ["start" ]
58
- self .last_marked_chunk = chunk
59
- return audio [start :] if self .use_vad_result else audio , (len (audio ) - start ), 0
60
-
61
- if "end" in chunk :
62
- #todo: pending get the padding from the next chunk
63
- end = chunk ["end" ] if chunk ["end" ] < len (audio ) else len (audio )
64
- self .last_marked_chunk = chunk
65
- return audio [:end ] if self .use_vad_result else audio , end ,len (audio ) - end
55
+ x = int2float (audio )
56
+ if not torch .is_tensor (x ):
57
+ try :
58
+ x = torch .Tensor (x )
59
+ except :
60
+ raise TypeError ("Audio cannot be casted to tensor. Cast it manually" )
61
+
62
+ speech_prob = self .model (x , self .sampling_rate ).item ()
63
+
64
+ window_size_samples = len (x [0 ]) if x .dim () == 2 else len (x )
65
+ self .current_sample += window_size_samples
66
+
66
67
67
- if self .last_marked_chunk is not None :
68
- if "start" in self .last_marked_chunk :
69
- return audio , len (audio ) ,0
68
+ if (speech_prob >= self .threshold ):
69
+ self .temp_end = 0
70
+ return audio , window_size_samples , 0
71
+
72
+ else :
73
+ if not self .temp_end :
74
+ self .temp_end = self .current_sample
75
+
76
+ if self .current_sample - self .temp_end < self .min_silence_samples :
77
+ return audio , 0 , window_size_samples
78
+ else :
79
+ return np .array ([], dtype = np .float16 ) , 0 , window_size_samples
70
80
71
- if "end" in self .last_marked_chunk :
72
- return np .array ([], dtype = np .float16 ) if self .use_vad_result else audio , 0 ,len (audio )
73
81
74
- return np .array ([], dtype = np .float16 ) if self .use_vad_result else audio , 0 , 0
75
82
76
83
77
84
78
85
def detect_user_speech (self , audio_stream , audio_in_int16 = False ):
79
- silence_len = 0
86
+ last_silence_len = 0
80
87
speech_len = 0
81
88
82
89
for data in audio_stream : # replace with your condition of choice
83
- # if isinstance(data, EndOfTransmission):
84
- # raise EndOfTransmission("End of transmission detected")
85
90
86
91
87
92
audio_block = np .frombuffer (data , dtype = np .int16 ) if not audio_in_int16 else data
88
93
wav = audio_block
89
94
90
-
91
95
is_final = False
92
- voice_audio , speech_in_wav , last_silent_duration_in_wav = self .apply_vad (wav )
93
- # print(f'----r> speech_in_wav: {speech_in_wav} last_silent_duration_in_wav: {last_silent_duration_in_wav}')
96
+ voice_audio , speech_in_wav , last_silent_in_wav = self .apply_vad (wav )
97
+
94
98
95
99
if speech_in_wav > 0 :
96
- silence_len = 0
100
+ last_silence_len = 0
97
101
speech_len += speech_in_wav
98
102
if self .activity_detected_callback is not None :
99
103
self .activity_detected_callback ()
100
104
101
- silence_len = silence_len + last_silent_duration_in_wav
102
- if silence_len >= self .silence_limit and speech_len >= self .speech_limit :
105
+ last_silence_len += last_silent_in_wav
106
+ if last_silence_len >= self .final_silence_limit and speech_len >= self .final_speech_limit :
107
+
103
108
is_final = True
104
- silence_len = 0
105
- speech_len = 0
106
-
109
+ last_silence_len = 0
110
+ speech_len = 0
107
111
108
112
yield voice_audio .tobytes (), is_final
109
113
0 commit comments