@@ -29,7 +29,7 @@ audio_url = "http://stream.live.vc.bbcmedia.co.uk/bbc_world_service"
29
29
# Define the WebSocket functions on_open, on_message, on_close, and on_error
30
30
def on_open (ws ):
31
31
print (" WebSocket connection established." )
32
-
32
+
33
33
# Start audio streaming thread
34
34
audio_thread = threading.Thread(target = stream_audio, args = (ws,))
35
35
audio_thread.daemon = True
@@ -367,3 +367,378 @@ print("File saved successfully at:", output_file_path)
367
367
368
368
```
369
369
370
+ ## Voice Agent Conversion using Deepgram API
371
+
372
+ ** Title:** Take audio from an input source and play the resulting agent audio
373
+
374
+ ** Code Sample:** voice-agent/voice-agent-play-audio/main.py
375
+
376
+ ** Description:** This Python script uses the Deepgram API to take audio from an input source and play the resulting agent audio on the selected output device.
377
+
378
+ ### voice-agent/voice-agent-play-audio/main.py
379
+
380
+ ``` python
381
+
382
+ import pyaudio
383
+ import asyncio
384
+ import sys
385
+ import os
386
+ import json
387
+ import inspect
388
+ import queue
389
+
390
+ import threading
391
+ from typing import Optional, Callable, Union
392
+
393
+ from websockets.sync.client import ClientConnection as SyncClientConnection
394
+
395
+ from websockets.sync.client import connect
396
+
397
+ TIMEOUT = 0.050
398
+ FORMAT = pyaudio.paInt16
399
+ CHANNELS = 1
400
+ RATE = 16000
401
+ CHUNK = 8000
402
+
403
+ def main ():
404
+ try :
405
+ dg_api_key = os.environ.get(" DEEPGRAM_API_KEY" )
406
+ if dg_api_key is None :
407
+ print (" DEEPGRAM_API_KEY env var not present" )
408
+ return
409
+
410
+ print (" \n\n\n Press Enter to stop...\n\n\n " )
411
+
412
+ _socket = connect(
413
+ " wss://agent.deepgram.com/agent" ,
414
+ additional_headers = {" Authorization" : f " Token { dg_api_key} " },
415
+ )
416
+
417
+ _config_message = {
418
+ " type" : " SettingsConfiguration" ,
419
+ " audio" : {
420
+ " input" : {
421
+ " encoding" : " linear16" ,
422
+ " sample_rate" : RATE ,
423
+ },
424
+ " output" : {
425
+ " encoding" : " linear16" ,
426
+ " sample_rate" : RATE ,
427
+ " container" : " none" ,
428
+ },
429
+ },
430
+ " agent" : {
431
+ " listen" : {" model" : " nova-2" },
432
+ " think" : {
433
+ " provider" : {
434
+ " type" : " open_ai" , # examples are anthropic, open_ai, groq, ollama
435
+ },
436
+ " model" : " gpt-4o-mini" , # examples are claude-3-haiku-20240307, gpt-3.5-turbo, mixtral-8x7b-32768, mistral
437
+ " instructions" : " You are a helpful AI assistant." ,
438
+ },
439
+ " speak" : {" model" : " aura-athena-en" },
440
+ },
441
+ }
442
+
443
+ _socket.send(json.dumps(_config_message))
444
+
445
+ speaker = Speaker()
446
+ speaker.start(_socket)
447
+
448
+ microphone = Microphone(push_callback = _socket.send)
449
+ microphone.start()
450
+
451
+ input ()
452
+
453
+ print (" Stopping microphone..." )
454
+ microphone.stop()
455
+
456
+ print (" Stopping speaker..." )
457
+ speaker.stop()
458
+
459
+ print (" Closing socket..." )
460
+ _socket.close()
461
+ _socket = None
462
+
463
+ except Exception as e:
464
+ print (f " main: { e} " )
465
+
466
+ class Microphone :
467
+ _audio: pyaudio.PyAudio
468
+ _chunk: int
469
+ _rate: int
470
+ _format: int
471
+ _channels: int
472
+ _input_device_index: Optional[int ]
473
+ _is_muted: bool
474
+
475
+ _stream: pyaudio.Stream
476
+ _asyncio_loop: asyncio.AbstractEventLoop
477
+ _asyncio_thread: threading.Thread
478
+ _exit: threading.Event
479
+
480
+ _push_callback_org: object
481
+ _push_callback: object
482
+
483
+ def __init__ (
484
+ self ,
485
+ push_callback : Optional[Callable] = None ,
486
+ rate : Optional[int ] = RATE ,
487
+ chunk : Optional[int ] = CHUNK ,
488
+ channels : Optional[int ] = CHANNELS ,
489
+ input_device_index : Optional[int ] = None ,
490
+ ):
491
+ self ._exit = threading.Event()
492
+
493
+ self ._audio = pyaudio.PyAudio()
494
+ self ._chunk = chunk
495
+ self ._rate = rate
496
+ self ._format = FORMAT
497
+ self ._channels = channels
498
+ self ._is_muted = False
499
+
500
+ self ._input_device_index = input_device_index
501
+ self ._push_callback_org = push_callback
502
+
503
+ def _start_asyncio_loop (self ) -> None :
504
+ self ._asyncio_loop = asyncio.new_event_loop()
505
+ self ._asyncio_loop.run_forever()
506
+
507
+ def is_active (self ) -> bool :
508
+ if self ._stream is None :
509
+ return False
510
+
511
+ val = self ._stream.is_active()
512
+ return val
513
+
514
+ def set_callback (self , push_callback : Callable) -> None :
515
+ self ._push_callback_org = push_callback
516
+
517
+ def start (self ) -> bool :
518
+ if self ._push_callback_org is None :
519
+ return False
520
+
521
+ if inspect.iscoroutinefunction(self ._push_callback_org):
522
+ self ._asyncio_thread = threading.Thread(target = self ._start_asyncio_loop)
523
+ self ._asyncio_thread.start()
524
+
525
+ self ._push_callback = lambda data : asyncio.run_coroutine_threadsafe(
526
+ self ._push_callback_org(data), self ._asyncio_loop
527
+ ).result()
528
+ else :
529
+ self ._push_callback = self ._push_callback_org
530
+
531
+ self ._stream = self ._audio.open(
532
+ format = self ._format,
533
+ channels = self ._channels,
534
+ rate = self ._rate,
535
+ input = True ,
536
+ output = False ,
537
+ frames_per_buffer = self ._chunk,
538
+ input_device_index = self ._input_device_index,
539
+ stream_callback = self ._callback,
540
+ )
541
+
542
+ self ._exit.clear()
543
+ self ._stream.start_stream()
544
+
545
+ return True
546
+
547
+ def _callback (self , input_data , frame_count , time_info , status_flags ):
548
+ if self ._exit.is_set():
549
+ return None , pyaudio.paAbort
550
+
551
+ if input_data is None :
552
+ return None , pyaudio.paContinue
553
+
554
+ try :
555
+ if self ._is_muted:
556
+ size = len (input_data)
557
+ input_data = b " \x00 " * size
558
+
559
+ self ._push_callback(input_data)
560
+ except Exception as e:
561
+ raise
562
+
563
+ return input_data, pyaudio.paContinue
564
+
565
+ def mute (self ) -> bool :
566
+ if self ._stream is None :
567
+ return False
568
+
569
+ self ._is_muted = True
570
+
571
+ return True
572
+
573
+ def unmute (self ) -> bool :
574
+ if self ._stream is None :
575
+ return False
576
+
577
+ self ._is_muted = False
578
+
579
+ return True
580
+
581
+ def stop (self ) -> bool :
582
+ self ._exit.set()
583
+
584
+ if self ._stream is not None :
585
+ self ._stream.stop_stream()
586
+ self ._stream.close()
587
+ self ._stream = None
588
+
589
+ if (
590
+ inspect.iscoroutinefunction(self ._push_callback_org)
591
+ and self ._asyncio_thread is not None
592
+ ):
593
+ self ._asyncio_loop.call_soon_threadsafe(self ._asyncio_loop.stop)
594
+ self ._asyncio_thread.join()
595
+ self ._asyncio_thread = None
596
+
597
+ return True
598
+
599
+ class Speaker :
600
+ _audio: pyaudio.PyAudio
601
+ _chunk: int
602
+ _rate: int
603
+ _format: int
604
+ _channels: int
605
+ _output_device_index: Optional[int ]
606
+
607
+ _queue: queue.Queue
608
+ _exit: threading.Event
609
+
610
+ _stream: pyaudio.Stream
611
+ _thread: threading.Thread
612
+ _asyncio_loop: asyncio.AbstractEventLoop
613
+ _receiver_thread: threading.Thread = None
614
+
615
+ _socket: SyncClientConnection
616
+ _push_callback_org: Callable = None
617
+ _push_callback: Callable = None
618
+ _loop: asyncio.AbstractEventLoop
619
+
620
+ def __init__ (
621
+ self ,
622
+ push_callback : Optional[Callable] = None ,
623
+ rate : int = RATE ,
624
+ chunk : int = CHUNK ,
625
+ channels : int = CHANNELS ,
626
+ output_device_index : Optional[int ] = None ,
627
+ ):
628
+ self ._exit = threading.Event()
629
+ self ._queue = queue.Queue()
630
+
631
+ self ._audio = pyaudio.PyAudio()
632
+ self ._chunk = chunk
633
+ self ._rate = rate
634
+ self ._format = FORMAT
635
+ self ._channels = channels
636
+ self ._output_device_index = output_device_index
637
+
638
+ self ._socket = None
639
+ self ._push_callback_org = push_callback
640
+
641
+ def set_callback (self , push_callback : Callable) -> None :
642
+ self ._push_callback_org = push_callback
643
+
644
+ def start (self , socket : SyncClientConnection) -> bool :
645
+ # Automatically get the current running event loop
646
+ if inspect.iscoroutinefunction(socket.send):
647
+ self ._loop = asyncio.get_running_loop()
648
+
649
+ self ._exit.clear()
650
+ self ._socket = socket
651
+
652
+ self ._stream = self ._audio.open(
653
+ format = self ._format,
654
+ channels = self ._channels,
655
+ rate = self ._rate,
656
+ input = False ,
657
+ output = True ,
658
+ frames_per_buffer = self ._chunk,
659
+ output_device_index = self ._output_device_index,
660
+ )
661
+
662
+ # determine if the push_callback is a coroutine
663
+ if inspect.iscoroutinefunction(self ._push_callback_org):
664
+ self ._push_callback = lambda data : asyncio.run_coroutine_threadsafe(
665
+ self ._push_callback_org(data), self ._asyncio_loop
666
+ ).result()
667
+ else :
668
+ self ._push_callback = self ._push_callback_org
669
+
670
+ # start the play thread
671
+ self ._thread = threading.Thread(
672
+ target = self ._play, args = (self ._queue, self ._stream, self ._exit), daemon = True
673
+ )
674
+ self ._thread.start()
675
+
676
+ # Start the stream
677
+ self ._stream.start_stream()
678
+
679
+ # Start the receiver thread within the start function
680
+ if self ._socket is not None :
681
+ print (" Starting receiver thread..." )
682
+ self ._receiver_thread = threading.Thread(
683
+ target = self ._start_receiver, args = (self ._socket,)
684
+ )
685
+ self ._receiver_thread.start()
686
+
687
+ return True
688
+
689
+ def _start_receiver (self , socket : SyncClientConnection):
690
+ print (" Starting threaded receiver..." )
691
+ self ._start_threaded_receiver(socket)
692
+
693
+ def _start_threaded_receiver (self , socket : SyncClientConnection):
694
+ try :
695
+ while True :
696
+ if socket is None or self ._exit.is_set():
697
+ break
698
+
699
+ message = socket.recv()
700
+ if message is None :
701
+ continue
702
+
703
+ if isinstance (message, str ):
704
+ print (message)
705
+ elif isinstance (message, bytes ):
706
+ self .add_audio_to_queue(message)
707
+ except Exception as e:
708
+ print (f " threaded receiver: { e} " )
709
+
710
+ def add_audio_to_queue (self , data ):
711
+ self ._queue.put(data)
712
+
713
+ def stop (self ):
714
+ self ._exit.set()
715
+
716
+ if self ._stream is not None :
717
+ self ._stream.stop_stream()
718
+ self ._stream.close()
719
+ self ._stream = None
720
+
721
+ self ._thread.join()
722
+ self ._thread = None
723
+
724
+ if self ._receiver_thread is not None :
725
+ self ._receiver_thread.join()
726
+ self ._receiver_thread = None
727
+
728
+ self ._socket = None
729
+ self ._queue = None
730
+
731
+ def _play (self , audio_out , stream , stop ):
732
+ while not stop.is_set():
733
+ try :
734
+ data = audio_out.get(True , TIMEOUT )
735
+ stream.write(data)
736
+ except queue.Empty:
737
+ pass
738
+ except Exception as e:
739
+ print (f " _play: { e} " )
740
+
741
+ if __name__ == " __main__" :
742
+ sys.exit(main() or 0 )
743
+
744
+ ```
0 commit comments