Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore-docker #314

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ TWILIO_PHONE_NUMBER=
# Plivo credentials
PLIVO_AUTH_ID=
PLIVO_AUTH_TOKEN=
PLIVO_PHONE_NUMBER=
PLIVO_PHONE_NUMBER=

NGROK_AUTHTOKEN=
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,6 @@ agent_data/**/mp3
*/__pycache__/
*/*/__pycache__/
logs/
agent_data/
agent_data/

.env
80 changes: 80 additions & 0 deletions bolna/llms/groq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import os
import time
from dotenv import load_dotenv
import litellm

from bolna.helpers.logger_config import configure_logger
from bolna.helpers.utils import json_to_pydantic_schema
from bolna.llms.llm import BaseLLM


logger = configure_logger(__name__)
load_dotenv()

class GroqLLM(BaseLLM):
def __init__(self, model="llama3-8b-8192", max_tokens=100, buffer_size=40, temperature=0.0, **kwargs):
super().__init__(max_tokens, buffer_size)
self.model = model
self.started_streaming = False
self.model_args = {"max_tokens": max_tokens, "temperature": temperature, "model": self.model}
self.api_key = kwargs.get("llm_key", os.getenv('GROQ_API_KEY'))

if self.api_key:
self.model_args["api_key"] = self.api_key
if "llm_key" in kwargs:
self.model_args["api_key"] = kwargs["llm_key"]


async def generate_stream(self, messages, synthesize=True, request_json=False):
answer, buffer = "", ""
model_args = self.model_args.copy()
model_args["messages"] = messages
model_args["stream"] = True

logger.info(f"Request to model: {self.model}: {messages} and model args {model_args}")
latency = False
start_time = time.time()

async for chunk in await litellm.acompletion(api_key=self.api_key, model=f"groq/{self.model}", messages=messages, stream=True, max_tokens=self.max_tokens, request_json=request_json):
if not self.started_streaming:
first_chunk_time = time.time()
latency = first_chunk_time - start_time
logger.info(f"LLM Latency: {latency:.2f} s")
self.started_streaming = True
if (text_chunk := chunk['choices'][0]['delta'].content) and not chunk['choices'][0].finish_reason:
answer += text_chunk
buffer += text_chunk

if len(buffer) >= self.buffer_size and synthesize:
text = ' '.join(buffer.split(" ")[:-1])
yield text, False, latency, False
buffer = buffer.split(" ")[-1]

if buffer:
yield buffer, True, latency, False
else:
yield answer, True, latency, False
self.started_streaming = False
logger.info(f"Time to generate response {time.time() - start_time} {answer}")


async def generate(self, messages, stream=False, request_json=False):
text = ""
model_args = self.model_args.copy()
model_args["model"] = self.model
model_args["messages"] = messages
model_args["stream"] = stream

if request_json:
model_args['response_format'] = {
"type": "json_object",
"schema": json_to_pydantic_schema('{"classification_label": "classification label goes here"}')
}
logger.info(f'Request to Groq LLM {model_args}')
try:
completion = await litellm.acompletion(**model_args)
text = completion.choices[0].message.content
logger.debug(completion) # Changed to debug for non-error logging
except Exception as e:
logger.error(f'Error generating response {e}')
return text
11 changes: 8 additions & 3 deletions bolna/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,22 @@ class Transcriber(BaseModel):
encoding: Optional[str] = "linear16"
endpointing: Optional[int] = 400
keywords: Optional[str] = None
task:Optional[str] = "transcribe"
task: Optional[str] = "transcribe"
provider: Optional[str] = "deepgram"

@validator("provider")
def validate_model(cls, value):
print(f"value {value}, PROVIDERS {list(SUPPORTED_TRANSCRIBER_PROVIDERS.keys())}")
return validate_attribute(value, list(SUPPORTED_TRANSCRIBER_PROVIDERS.keys()))

# Only whisper works well with russian and kazakh
@validator("language")
def validate_language(cls, value):
return validate_attribute(value, ["en", "hi", "es", "fr", "pt", "ko", "ja", "zh", "de", "it", "pt-BR"])
def validate_language(cls, value, values):
supported_languages = ["en", "hi", "es", "fr", "pt", "ko", "ja", "zh", "de", "it", "pt-BR"]
if values.get('model') == 'whisper':
supported_languages.append('ru')
supported_languages.append('kk')
return validate_attribute(value, supported_languages)


class Synthesizer(BaseModel):
Expand Down
7 changes: 5 additions & 2 deletions bolna/providers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from bolna.llms.groq import GroqLLM
from bolna.synthesizer.cambai_synthesizer import CambAiSynthesizer
from bolna.transcriber.bodhi_transcriber import BodhiTranscriber
from .synthesizer import PollySynthesizer, XTTSSynthesizer, ElevenlabsSynthesizer, OPENAISynthesizer, FourieSynthesizer, DeepgramSynthesizer, MeloSynthesizer, StylettsSynthesizer, AzureSynthesizer
from .transcriber import DeepgramTranscriber, WhisperTranscriber
Expand All @@ -14,7 +16,8 @@
'deepgram': DeepgramSynthesizer,
'melotts': MeloSynthesizer,
'styletts': StylettsSynthesizer,
'azuretts': AzureSynthesizer
'azuretts': AzureSynthesizer,
'cambai': CambAiSynthesizer
}

SUPPORTED_TRANSCRIBER_PROVIDERS = {
Expand Down Expand Up @@ -42,7 +45,7 @@
'anyscale': LiteLLM,
'custom': OpenAiLLM,
'ola': OpenAiLLM,
'groq': LiteLLM,
'groq': GroqLLM,
'anthropic': LiteLLM
}
SUPPORTED_INPUT_HANDLERS = {
Expand Down
142 changes: 142 additions & 0 deletions bolna/synthesizer/cambai_synthesizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import os
import time
from dotenv import load_dotenv
import requests
from bolna.helpers.logger_config import configure_logger
from bolna.helpers.utils import create_ws_data_packet, wav_bytes_to_pcm
from bolna.memory.cache.inmemory_scalar_cache import InmemoryScalarCache
from .base_synthesizer import BaseSynthesizer

logger = configure_logger(__name__)
load_dotenv()

class CambAiSynthesizer(BaseSynthesizer):
def __init__(self, voice_id, language, gender, sampling_rate=16000, buffer_size=400, caching=True, **kwargs):
super().__init__(False, buffer_size) # stream is always False
self.voice_id = voice_id
self.language = language
self.gender = gender
logger.info(f"{self.voice_id} initialized")
self.sample_rate = str(sampling_rate)
self.first_chunk_generated = False
self.synthesized_characters = 0
self.caching = caching
if caching:
self.cache = InmemoryScalarCache()

# Initialize CambAI API Key
self.subscription_key = kwargs.get("synthesizer_key", os.getenv("CAMBAI_API_KEY"))
if not self.subscription_key:
raise ValueError("CambAI API key must be provided")

def get_synthesized_characters(self):
return self.synthesized_characters

def get_engine(self):
return "CambAI"

def supports_websocket(self):
return False

async def synthesize(self, text):
audio = await self.__generate_http(text)
return audio

def __send_tts_request(self, text):
url = "https://client.camb.ai/apis/tts"
headers = {
"x-api-key": self.subscription_key,
"Content-Type": "application/json"
}
payload = {
"text": text,
"voice_id": self.voice_id,
"language": self.language,
"gender": self.gender
}
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
return response.json().get("task_id")
else:
logger.error(f"Failed to send TTS request: {response.text}")
return None

def __poll_tts_status(self, task_id):
url = f"https://client.camb.ai/apis/tts/{task_id}"
headers = {
"x-api-key": self.subscription_key
}
while True:
response = requests.get(url, headers=headers)
if response.status_code == 200:
status = response.json().get("status")
if status == "SUCCESS":
return response.json().get("run_id")
elif status == "PENDING":
time.sleep(2) # Polling interval
else:
logger.error(f"TTS task failed with status: {status}")
return None
else:
logger.error(f"Failed to poll TTS status: {response.text}")
return None

def __get_tts_result(self, run_id):
url = f"https://client.camb.ai/apis/tts_result/{run_id}"
headers = {
"x-api-key": self.subscription_key
}
response = requests.get(url, headers=headers, stream=True)
if response.status_code == 200:
audio_data = b""
for chunk in response.iter_content(chunk_size=1024):
audio_data += chunk
return audio_data
else:
logger.error(f"Failed to get TTS result: {response.text}")
return None

async def __generate_http(self, text):
task_id = self.__send_tts_request(text)
if task_id:
run_id = self.__poll_tts_status(task_id)
if run_id:
return self.__get_tts_result(run_id)
return None

async def generate(self):
while True:
logger.info("Generating TTS response")
message = await self.internal_queue.get()
logger.info(f"Generating TTS response for message: {message}")
meta_info, text = message.get("meta_info"), message.get("data")
if self.caching:
logger.info("Caching is on")
cached_message = self.cache.get(text)
if cached_message:
logger.info(f"Cache hit and hence returning quickly {text}")
message = cached_message
else:
logger.info(f"Not a cache hit {list(self.cache.data_dict)}")
self.synthesized_characters += len(text)
message = await self.__generate_http(text)
self.cache.set(text, message)
else:
logger.info("No caching present")
self.synthesized_characters += len(text)
message = await self.__generate_http(text)

if not self.first_chunk_generated:
meta_info["is_first_chunk"] = True
self.first_chunk_generated = True
else:
meta_info["is_first_chunk"] = False
if "end_of_llm_stream" in meta_info and meta_info["end_of_llm_stream"]:
meta_info["end_of_synthesizer_stream"] = True
self.first_chunk_generated = False

meta_info['text'] = text
meta_info['format'] = 'wav'
message = wav_bytes_to_pcm(message)

yield create_ws_data_packet(message, meta_info)
3 changes: 1 addition & 2 deletions bolna/synthesizer/elevenlabs_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,7 @@ async def generate(self):
audio = message
else:
self.meta_info['format'] = "wav"
audio = resample(convert_audio_to_wav(message, source_format="mp3"), int(self.sampling_rate),
format="wav")
audio = resample(message, int(self.sampling_rate))

yield create_ws_data_packet(audio, self.meta_info)
if not self.first_chunk_generated:
Expand Down
4 changes: 2 additions & 2 deletions bolna/synthesizer/openai_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async def __generate_stream(self, text):
spoken_response = await self.async_client.audio.speech.create(
model=self.model,
voice=self.voice,
response_format="mp3",
response_format="wav",
input=text
)

Expand All @@ -71,7 +71,7 @@ async def generate(self):
if not self.first_chunk_generated:
meta_info["is_first_chunk"] = True
self.first_chunk_generated = True
yield create_ws_data_packet(resample(convert_audio_to_wav(chunk, 'mp3'), self.sample_rate, format="wav"), meta_info)
yield create_ws_data_packet(resample(chunk, self.sample_rate, format="wav"), meta_info)

if "end_of_llm_stream" in meta_info and meta_info["end_of_llm_stream"]:
meta_info["end_of_synthesizer_stream"] = True
Expand Down
8 changes: 4 additions & 4 deletions bolna/transcriber/deepgram_transcriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@


class DeepgramTranscriber(BaseTranscriber):
def __init__(self, telephony_provider, input_queue=None, model='nova-2', stream=True, language="en", endpointing="400",
def __init__(self, telephony_provider, input_queue=None, model='whisper', stream=True, language="en", endpointing="400",
sampling_rate="16000", encoding="linear16", output_queue=None, keywords=None,
process_interim_results="true", **kwargs):
logger.info(f"Initializing transcriber {kwargs}")
super().__init__(input_queue)
self.endpointing = endpointing
self.language = language if model == "nova-2" else "en"
self.language = language if model == "whisper" else "en"
self.stream = stream
self.provider = telephony_provider
self.heartbeat_task = None
Expand All @@ -43,8 +43,8 @@ def __init__(self, telephony_provider, input_queue=None, model='nova-2', stream=
self.transcription_cursor = 0.0
logger.info(f"self.stream: {self.stream}")
self.interruption_signalled = False
if 'nova-2' not in self.model:
self.model = "nova-2"
if 'whisper' not in self.model:
self.model = "whisper"
if not self.stream:
self.api_url = f"https://{self.deepgram_host}/v1/listen?model={self.model}&filler_words=true&language={self.language}"
self.session = aiohttp.ClientSession()
Expand Down
Loading