Skip to content

Commit 40b353f

Browse files
authored
added examples for tts websockets using asyncio (#8)
`save_to_disk.py` saves the audio to disk `play.py` plays the audio to the computer speakers using pyaudio and is therefore much more complicated than `save_to_disk.py`
1 parent 0b06af0 commit 40b353f

File tree

2 files changed

+256
-0
lines changed

2 files changed

+256
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
import os
2+
import json
3+
import asyncio
4+
import aiohttp
5+
6+
import pyaudio
7+
8+
api_key = os.environ["DEEPGRAM_API_KEY"]
9+
headers = {
10+
"Authorization": f"Token {api_key}",
11+
}
12+
13+
# URL for Deepgram WebSocket API
14+
DEEPGRAM_WS_URL = "wss://api.deepgram.com/v1/speak"
15+
16+
# Text to speak
17+
TEXT: str = """
18+
The sun had just begun to rise over the sleepy town of Millfield.
19+
Emily a young woman in her mid-twenties was already awake and bustling about.
20+
"""
21+
TEXT = TEXT.strip()
22+
23+
# Audio settings
24+
FORMAT = pyaudio.paInt16
25+
CHANNELS = 1
26+
SAMPLE_RATE = 48000
27+
CHUNK_SIZE = 8000
28+
29+
30+
class AsyncSpeaker:
31+
def __init__(
32+
self,
33+
rate: int = SAMPLE_RATE,
34+
chunk_size: int = CHUNK_SIZE,
35+
channels: int = CHANNELS,
36+
output_device_index: int = None,
37+
):
38+
self._audio = pyaudio.PyAudio()
39+
self._chunk = chunk_size
40+
self._rate = rate
41+
self._format = FORMAT
42+
self._channels = channels
43+
self._output_device_index = output_device_index
44+
self._stream = None
45+
self._audio_queue = asyncio.Queue()
46+
self._is_playing = False
47+
48+
def start(self) -> bool:
49+
self._stream = self._audio.open(
50+
format=self._format,
51+
channels=self._channels,
52+
rate=self._rate,
53+
input=False,
54+
output=True,
55+
frames_per_buffer=self._chunk,
56+
output_device_index=self._output_device_index,
57+
)
58+
self._stream.start_stream()
59+
self._is_playing = True
60+
return True
61+
62+
def stop(self):
63+
self._is_playing = False
64+
if self._stream is not None:
65+
self._stream.stop_stream()
66+
self._stream.close()
67+
self._stream = None
68+
69+
async def play(self, data):
70+
await self._audio_queue.put(data)
71+
72+
async def _play_audio(self):
73+
while self._is_playing:
74+
try:
75+
data = await asyncio.wait_for(self._audio_queue.get(), timeout=0.050)
76+
self._stream.write(data)
77+
self._audio_queue.task_done()
78+
except asyncio.TimeoutError:
79+
continue
80+
except Exception as e:
81+
print(f"_play_audio error: {e}")
82+
break
83+
84+
85+
def chunk_text(text: str, words_per_chunk: int):
86+
words = text.split()
87+
for i in range(0, len(words), words_per_chunk):
88+
yield " ".join(words[i : i + words_per_chunk])
89+
90+
91+
async def stream_text_to_websocket():
92+
speaker = AsyncSpeaker()
93+
async with aiohttp.ClientSession() as session:
94+
url = f"{DEEPGRAM_WS_URL}?encoding=linear16&sample_rate={SAMPLE_RATE}"
95+
async with session.ws_connect(url, headers=headers) as ws:
96+
print("WebSocket connection established.")
97+
CLOSE_MESSAGE_RECEIVED = False
98+
99+
async def send_text_stream():
100+
for a_few_words in chunk_text(TEXT, 3):
101+
await asyncio.sleep(0.5) # pause between sending text
102+
print(f"Sending: {a_few_words}")
103+
await ws.send_str(
104+
json.dumps({"type": "Speak", "text": a_few_words})
105+
)
106+
await ws.send_str(json.dumps({"type": "Flush"}))
107+
await ws.send_str(json.dumps({"type": "Close"}))
108+
# Wait until Deepgram closes the websocket, then close it on this end
109+
while not CLOSE_MESSAGE_RECEIVED:
110+
await asyncio.sleep(0.1)
111+
await ws.close()
112+
print("WebSocket connection closed.")
113+
114+
async def receive_audio_stream():
115+
speaker.start()
116+
try:
117+
audio_player = asyncio.create_task(speaker._play_audio())
118+
last_audio_duration = 0
119+
nonlocal CLOSE_MESSAGE_RECEIVED
120+
while True:
121+
try:
122+
message = await ws.receive(timeout=2)
123+
except asyncio.TimeoutError:
124+
continue
125+
if message.type == aiohttp.WSMsgType.BINARY:
126+
last_audio_duration = len(message.data) / (
127+
SAMPLE_RATE * CHANNELS * 2
128+
)
129+
await speaker.play(message.data)
130+
elif message.type == aiohttp.WSMsgType.CLOSE:
131+
CLOSE_MESSAGE_RECEIVED = True
132+
break
133+
134+
# Wait for remaining audio to be sent to the player
135+
await speaker._audio_queue.join()
136+
# Wait for the last bit of audio to be played
137+
await asyncio.sleep(last_audio_duration + 0.5)
138+
speaker.stop()
139+
audio_player.cancel()
140+
141+
except Exception as e:
142+
print(f"receiver error: {vars(e)}")
143+
speaker.stop()
144+
145+
await asyncio.gather(send_text_stream(), receive_audio_stream())
146+
147+
148+
async def main():
149+
await stream_text_to_websocket()
150+
151+
152+
if __name__ == "__main__":
153+
asyncio.run(main())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import os
2+
import json
3+
import struct
4+
import asyncio
5+
import aiohttp
6+
7+
import pyaudio
8+
9+
api_key = os.environ["DEEPGRAM_API_KEY"]
10+
headers = {
11+
"Authorization": f"Token {api_key}",
12+
}
13+
14+
# URL for Deepgram WebSocket API
15+
DEEPGRAM_WS_URL = "wss://api.deepgram.com/v1/speak"
16+
17+
# Text to speak
18+
TEXT: str = """
19+
The sun had just begun to rise over the sleepy town of Millfield.
20+
Emily a young woman in her mid-twenties was already awake and bustling about.
21+
"""
22+
TEXT = TEXT.strip()
23+
24+
# Audio settings
25+
FORMAT = pyaudio.paInt16
26+
CHANNELS = 1
27+
SAMPLE_RATE = 48000
28+
CHUNK_SIZE = 8000
29+
30+
31+
def generate_wav_header(sample_rate: int, channels: int):
32+
BITS_PER_SAMPLE = 8
33+
byte_rate = sample_rate * channels * (BITS_PER_SAMPLE // 8)
34+
header = b""
35+
header += b"RIFF"
36+
header += struct.pack("<I", 0)
37+
header += b"WAVE"
38+
header += b"fmt "
39+
header += struct.pack("<IHHIIHH", 16, 1, 1, sample_rate, byte_rate, 2, 16)
40+
header += b"data"
41+
header += struct.pack("<I", 0)
42+
return header
43+
44+
45+
def chunk_text(text: str, words_per_chunk: int):
46+
words = text.split()
47+
for i in range(0, len(words), words_per_chunk):
48+
yield " ".join(words[i : i + words_per_chunk])
49+
50+
51+
async def stream_text_to_websocket():
52+
async with aiohttp.ClientSession() as session:
53+
url = f"{DEEPGRAM_WS_URL}?sample_rate={SAMPLE_RATE}"
54+
async with session.ws_connect(url, headers=headers) as ws:
55+
print("WebSocket connection established.")
56+
CLOSE_MESSAGE_RECEIVED = False
57+
58+
async def send_text_stream():
59+
for a_few_words in chunk_text(TEXT, 3):
60+
await asyncio.sleep(0.5) # pause between sending text
61+
print(f"Sending: {a_few_words}")
62+
await ws.send_str(
63+
json.dumps({"type": "Speak", "text": a_few_words})
64+
)
65+
await ws.send_str(json.dumps({"type": "Flush"}))
66+
await ws.send_str(json.dumps({"type": "Close"}))
67+
# Wait until Deepgram closes the websocket, then close it on this end
68+
while not CLOSE_MESSAGE_RECEIVED:
69+
await asyncio.sleep(0.1)
70+
await ws.close()
71+
print("WebSocket connection closed.")
72+
73+
async def receive_audio_stream():
74+
try:
75+
nonlocal CLOSE_MESSAGE_RECEIVED
76+
with open("output.wav", "wb") as f:
77+
header = generate_wav_header(SAMPLE_RATE, CHANNELS)
78+
f.write(header)
79+
while True:
80+
try:
81+
message = await ws.receive(timeout=2)
82+
except asyncio.TimeoutError:
83+
continue
84+
if message.type == aiohttp.WSMsgType.BINARY:
85+
f.write(message.data)
86+
f.flush()
87+
elif message.type == aiohttp.WSMsgType.CLOSE:
88+
CLOSE_MESSAGE_RECEIVED = True
89+
break
90+
print("Audio saved to `output.wav`")
91+
92+
except Exception as e:
93+
print(f"receiver error: {vars(e)}")
94+
95+
await asyncio.gather(send_text_stream(), receive_audio_stream())
96+
97+
98+
async def main():
99+
await stream_text_to_websocket()
100+
101+
102+
if __name__ == "__main__":
103+
asyncio.run(main())

0 commit comments

Comments
 (0)