add-assistant #2

Merged
Ari merged 8 commits from add-assistant into main 2024-08-06 01:14:25 +00:00
3 changed files with 403 additions and 0 deletions
Showing only changes of commit 8f89bf0331 - Show all commits

View File

@ -0,0 +1,117 @@
import os
from openai import OpenAI
class ConfigureAssistant:
"""
A class to configure an OpenAI assistant for aiding designers using the ArchiMajor project.
This class handles the creation of the assistant, the setup of vector stores, and the upload of files for context.
"""
def __init__(self, root_path: str):
"""
Initialize the ConfigureAssistant with the given root path and OpenAI client.
:param root_path: The root directory path where support files are located.
"""
self.root_path = root_path
self.client = OpenAI()
self.assistant = None
self.vector_store = None
def create_assistant(self):
"""
Create an OpenAI assistant with specific instructions and tools.
"""
AI_ASSISTANT_INSTRUCTIONS = """You are an assistant to aid designers who are either using the ArchiMajor board or designing a new board based off this one. They may ask questions about specific components or interconnects within this project.
You have the following reference documents
1. Schematic (PDF)
2. Assortment of datasheets for chips used in design (PDF)
3. Bill of Materials (CSV)
These documents will provide you context to answer these questions with high level of confidence. Use your retrieval augmented generative (RAG) capabilities to leverage the files provided to you.
"""
# Create the assistant with the given instructions, model, temperature, and tools
self.assistant = self.client.beta.assistants.create(
name="A bot to answer questions about the ArchiMajor Board",
instructions=AI_ASSISTANT_INSTRUCTIONS,
model="gpt-4o",
temperature=0.1,
tools=[{"type": "file_search"}, {"type": "code_interpreter"}],
)
def get_file_paths(self, excluded_folders=None):
"""
Retrieve all file paths within the root directory and its subdirectories.
:param excluded_folders: A list of folders to exclude from the file path collection.
:return: A list of file paths.
"""
excluded_folders = excluded_folders or []
file_paths = []
# Walk through the directory tree and collect all file paths, excluding specific folders
for root, dirs, files in os.walk(self.root_path):
if any(folder in root for folder in excluded_folders):
continue
for file in files:
file_paths.append(os.path.join(root, file))
return file_paths
def create_vector_store(self):
"""
Create a vector store for storing design documents.
"""
# Create a new vector store with the name "Design Documents"
self.vector_store = self.client.beta.vector_stores.create(
name="Design Documents"
)
def upload_files_to_vector_store(self, file_paths):
"""
Upload files to the vector store one by one and log the status of each upload.
:param file_paths: A list of file paths to upload.
"""
for path in file_paths:
try:
with open(path, "rb") as file_stream:
# Upload the file to the vector store
file_batch = self.client.beta.vector_stores.file_batches.upload_and_poll(
vector_store_id=self.vector_store.id, files=[file_stream]
)
print(f"Successfully uploaded: {path}")
except Exception as e:
print(f"Failed to upload: {path} with error: {e}")
def update_assistant_with_vector_store(self):
"""
Update the assistant to use the created vector store for file search.
"""
# Update the assistant with the new vector store's ID for file search capability
self.assistant = self.client.beta.assistants.update(
assistant_id=self.assistant.id,
tool_resources={
"file_search": {"vector_store_ids": [self.vector_store.id]}
},
)
def configure(self):
"""
Configure the assistant by creating it, retrieving file paths, creating a vector store,
uploading files to the vector store, and updating the assistant with the vector store.
"""
self.create_assistant()
file_paths = self.get_file_paths(excluded_folders=[".git"])
self.create_vector_store()
self.upload_files_to_vector_store(file_paths)
self.update_assistant_with_vector_store()
if __name__ == "__main__":
# Define the root path for support files
root_path = os.path.dirname(os.path.dirname(__file__))
# Create an instance of ConfigureAssistant with the root path
configurator = ConfigureAssistant(root_path=root_path)
# Configure the assistant
configurator.configure()

View File

@ -0,0 +1,81 @@
import os
import logging
from openai import OpenAI
class OpenAIResourceManager:
"""
A class to manage OpenAI resources such as assistants, vector stores, and files.
Provides methods to delete all resources of each type.
"""
def __init__(self, api_key: str):
"""
Initialize the OpenAIResourceManager with the given API key and configure logging.
:param api_key: The API key for OpenAI.
"""
self.client = OpenAI(api_key=api_key)
logging.basicConfig(level=logging.INFO)
def delete_all_assistants(self):
"""
Delete all assistants associated with the OpenAI account.
"""
try:
# Retrieve the list of all assistants
assistants = self.client.beta.assistants.list()
# Loop through each assistant and delete it
for assistant in assistants.data:
self.client.beta.assistants.delete(assistant.id)
logging.info(f"Deleted assistant: {assistant.id}")
except Exception as e:
logging.error(f"Failed to delete assistants: {e}")
raise
def delete_all_vector_stores(self):
"""
Delete all vector stores associated with the OpenAI account.
"""
try:
# Retrieve the list of all vector stores
vector_stores = self.client.beta.vector_stores.list()
# Loop through each vector store and delete it
for vector_store in vector_stores.data:
self.client.beta.vector_stores.delete(vector_store.id)
logging.info(f"Deleted vector store: {vector_store.id}")
except Exception as e:
logging.error(f"Failed to delete vector stores: {e}")
raise
def delete_all_files(self):
"""
Delete all files associated with the OpenAI account.
"""
try:
# Retrieve the list of all files
files = self.client.files.list()
# Loop through each file and delete it
for file in files.data:
self.client.files.delete(file.id)
logging.info(f"Deleted file: {file.id}")
except Exception as e:
logging.error(f"Failed to delete files: {e}")
raise
def delete_all_resources(self):
"""
Delete all assistants, vector stores, and files associated with the OpenAI account.
"""
self.delete_all_assistants()
self.delete_all_vector_stores()
self.delete_all_files()
if __name__ == "__main__":
# Get the OpenAI API key from the environment variables
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIResourceManager
manager = OpenAIResourceManager(api_key=api_key)
# Delete all resources
manager.delete_all_resources()

205
assistant/QueryAssistant.py Normal file
View File

@ -0,0 +1,205 @@
import argparse
import logging
import time
from openai import OpenAI
from openai import AssistantEventHandler
from typing_extensions import override
# Configure logging
logging.basicConfig(level=logging.INFO)
class QueryAssistant:
"""
A class to manage querying an OpenAI assistant.
Provides methods to create threads, send messages, and stream or fetch responses.
"""
def __init__(self, assistant_id: str):
"""
Initialize the QueryAssistant with the given assistant ID and OpenAI client.
:param assistant_id: The ID of the OpenAI assistant.
"""
self.client = OpenAI()
self.assistant_id = assistant_id
def create_thread(self):
"""
Create a new thread for the assistant.
:return: The created thread object.
"""
logging.info("Creating a new thread...")
thread = self.client.beta.threads.create()
logging.info(f"Thread created: {thread.id}")
return thread
def create_message(self, thread_id: str, content: str):
"""
Create a message in the specified thread with the given content.
:param thread_id: The ID of the thread.
:param content: The content of the message.
:return: The created message object.
"""
logging.info(f"Creating message in thread {thread_id}...")
message = self.client.beta.threads.messages.create(
thread_id=thread_id, role="user", content=content
)
logging.info("Message created")
return message
def stream_response(self, thread_id: str):
"""
Stream the response from the assistant for the specified thread.
:param thread_id: The ID of the thread.
"""
logging.info(f"Streaming response for thread {thread_id}...")
with self.client.beta.threads.runs.stream(
thread_id=thread_id,
assistant_id=self.assistant_id,
event_handler=self.EventHandler(),
) as stream:
stream.until_done()
logging.info("Response streaming completed")
def fetch_response(self, thread_id: str):
"""
Fetch the response from the assistant for the specified thread (non-streaming).
:param thread_id: The ID of the thread.
"""
logging.info(f"Fetching response for thread {thread_id}...")
run = self.client.beta.threads.runs.create_and_poll(
thread_id=thread_id,
assistant_id=self.assistant_id
)
# Poll the run status with a delay to reduce the number of GET requests
while run.status != 'completed' and run.status != 'failed':
time.sleep(2) # Add a 2-second delay between checks
run = self.client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id)
logging.info(f"Run status: {run.status}")
if run.status == 'completed':
messages = self.client.beta.threads.messages.list(thread_id=thread_id).data
for message in messages:
if message.role == 'assistant':
for content in message.content:
if content.type == 'text':
print(content.text.value)
else:
logging.error(f"Run failed with status: {run.status}")
if run.incomplete_details:
logging.error(f"Incomplete details: {run.incomplete_details}")
class EventHandler(AssistantEventHandler):
"""
A class to handle events from the assistant's response stream.
"""
@override
def on_text_created(self, text) -> None:
"""
Handle the event when text is created by the assistant.
:param text: The created text.
"""
logging.info("Text created by assistant")
print(f"\nassistant > ", end="", flush=True)
@override
def on_text_delta(self, delta, snapshot):
"""
Handle the event when there is a delta in the assistant's response.
:param delta: The response delta.
:param snapshot: The snapshot of the response.
"""
print(delta.value, end="", flush=True)
def on_tool_call_created(self, tool_call):
"""
Handle the event when a tool call is created by the assistant.
:param tool_call: The created tool call.
"""
logging.info(f"Tool call created: {tool_call.type}")
print(f"\nassistant > {tool_call.type}\n", flush=True)
def on_tool_call_delta(self, delta, snapshot):
"""
Handle the event when there is a delta in the assistant's tool call.
:param delta: The tool call delta.
:param snapshot: The snapshot of the tool call.
"""
if delta.type == "code_interpreter":
if delta.code_interpreter.input:
print(delta.code_interpreter.input, end="", flush=True)
if delta.code_interpreter.outputs:
print(f"\n\noutput >", flush=True)
for output in delta.code_interpreter.outputs:
if output.type == "logs":
print(f"\n{output.logs}", flush=True)
def main(query: str, assistant_id: str, context: str, use_streaming: bool):
"""
The main function to run the assistant query.
:param query: The query to ask the assistant.
:param assistant_id: The ID of the assistant.
:param context: The context to set before the query.
:param use_streaming: Boolean flag to determine if streaming should be used.
"""
assistant = QueryAssistant(assistant_id=assistant_id)
thread = assistant.create_thread()
# Merge the context and query into a single message
full_query = f"Context: {context}\nQuery: {query}"
# Print the full query
print("\n" + "=" * 100)
print(f"{full_query}")
print("=" * 100 + "\n")
# Send the message
assistant.create_message(thread_id=thread.id, content=full_query)
if use_streaming:
assistant.stream_response(thread_id=thread.id)
else:
assistant.fetch_response(thread_id=thread.id)
print("\n")
if __name__ == "__main__":
# Default query and context
DEFAULT_QUERY = "What are you capable of as an assistant?"
DEFAULT_CONTEXT = "Use your vector store to answer questions about the Arty A7 Evaluation Board. Take time to understand the context and introspect yourself. If you don't know the answer simply respond with 'I don't know'. It is NEVER okay to return an empty response."
# Parse command line arguments
parser = argparse.ArgumentParser(description="Run an assistant query.")
parser.add_argument(
"--query",
type=str,
default=DEFAULT_QUERY,
help="The query to ask the assistant.",
)
parser.add_argument(
"--assistant_id",
type=str,
default="asst_JUXTF2T6n3RFDjkolNqdtPxj",
help="The assistant ID to use.",
)
parser.add_argument(
"--context",
type=str,
default=DEFAULT_CONTEXT,
help="The context to set before the query.",
)
parser.add_argument(
"--use-streaming",
action="store_true",
help="Flag to determine if streaming should be used.",
)
# Run the main function with parsed arguments
args = parser.parse_args()
main(args.query, args.assistant_id, args.context, args.use_streaming)