add-assistant #2

Merged
Ari merged 8 commits from add-assistant into main 2024-08-06 01:14:25 +00:00
3 changed files with 593 additions and 0 deletions

View File

@ -0,0 +1,187 @@
import os
import io
from openai import OpenAI
class ConfigureAssistant:
"""
A class to configure an OpenAI assistant for aiding designers using the ArchiMajor project.
This class handles the creation of the assistant, the setup of vector stores, and the upload of files for context.
"""
SUPPORTED_FORMATS = {
"c",
"cpp",
"css",
"docx",
"gif",
"html",
"java",
"jpeg",
"jpg",
"js",
"json",
"md",
"pdf",
"php",
# "png", # PNG is supported but not actually parsed
"pptx",
"py",
"rb",
# "tar", # TAR is supported but not actually parsed
"tex",
"ts",
"txt",
"webp",
"xlsx",
"xml",
"zip",
# "csv", # CSV is supported but not actually parsed so we're going to treat it as text
}
def __init__(self, root_path: str):
"""
Initialize the ConfigureAssistant with the given root path and OpenAI client.
:param root_path: The root directory path where support files are located.
"""
self.root_path = root_path
self.client = OpenAI()
self.assistant = None
self.vector_store = None
def create_assistant(self):
"""
Create an OpenAI assistant with specific instructions and tools.
"""
AI_ASSISTANT_INSTRUCTIONS = """You are an assistant to aid designers who are either using the ArchiMajor board or designing a new board based off this one. They may ask questions about specific components or interconnects within this project.
You have the following reference documents
1. Schematic (PDF)
2. Assortment of datasheets for chips used in design (PDF)
3. Bill of Materials (CSV)
These documents will provide you context to answer these questions with high level of confidence. Use your retrieval augmented generative (RAG) capabilities to leverage the files provided to you.
"""
# Create the assistant with the given instructions, model, temperature, and tools
self.assistant = self.client.beta.assistants.create(
name="A bot to answer questions about the ArchiMajor Board",
instructions=AI_ASSISTANT_INSTRUCTIONS,
model="gpt-4o",
temperature=0.1,
tools=[{"type": "file_search"}, ], # {"type": "code_interpreter"}], # Code interpreter doesn't seem to return a delta properly
)
def get_file_paths(self, excluded_folders=None):
"""
Retrieve all file paths within the root directory and its subdirectories.
:param excluded_folders: A list of folders to exclude from the file path collection.
:return: A list of file paths.
"""
excluded_folders = excluded_folders or []
file_paths = []
# Walk through the directory tree and collect all file paths, excluding specific folders
for root, dirs, files in os.walk(self.root_path):
if any(folder in root for folder in excluded_folders):
continue
for file in files:
file_paths.append(os.path.join(root, file))
return file_paths
def preprocess_file(self, file_path):
"""
Preprocess files based on their type.
:param file_path: The file path to preprocess.
:return: The filename to be used during upload and a flag indicating if the file should be uploaded.
"""
extension = file_path.split(".")[-1].lower()
excluded_extensions = ["schdoc", "exe", "so", "dll", "outjob", "pcbdoc", "png", "tar"]
# If the extension is not in supported formats and not in excluded extensions, spoof the file name
if extension not in self.SUPPORTED_FORMATS and extension not in excluded_extensions:
new_file_name = f"{os.path.basename(file_path)}.txt"
print(f"Spoofing unsupported file: {file_path} as {new_file_name}")
return new_file_name, True
elif extension in self.SUPPORTED_FORMATS:
return os.path.basename(file_path), True
else:
print(f"Skipping excluded file: {file_path}")
return os.path.basename(file_path), False
def create_vector_store(self):
"""
Create a vector store for storing design documents.
"""
# Create a new vector store with the name "Design Documents"
self.vector_store = self.client.beta.vector_stores.create(
name="Design Documents"
)
def upload_files_to_vector_store(self, file_paths):
"""
Upload files to the vector store one by one and log the status of each upload.
:param file_paths: A list of file paths to upload.
"""
for path in file_paths:
new_filename, should_upload = self.preprocess_file(path)
if not should_upload:
continue
try:
with open(path, "rb") as file_stream:
file_content = file_stream.read()
spoofed_file = io.BytesIO(file_content)
spoofed_file.name = new_filename # Spoof the filename
# Upload the file to the vector store
file_batch = (
self.client.beta.vector_stores.file_batches.upload_and_poll(
vector_store_id=self.vector_store.id, files=[spoofed_file]
)
)
print(f"Successfully uploaded: {new_filename}")
except Exception as e:
print(f"Failed to upload: {new_filename} with error: {e}")
def update_assistant_with_vector_store(self):
"""
Update the assistant to use the created vector store for file search.
"""
# Update the assistant with the new vector store's ID for file search capability
self.assistant = self.client.beta.assistants.update(
assistant_id=self.assistant.id,
tool_resources={
"file_search": {"vector_store_ids": [self.vector_store.id]}
},
)
def configure(self):
"""
Configure the assistant by creating it, retrieving file paths, creating a vector store,
uploading files to the vector store, and updating the assistant with the vector store.
"""
self.create_assistant()
file_paths = self.get_file_paths(excluded_folders=[".git"])
self.create_vector_store()
self.upload_files_to_vector_store(file_paths)
self.update_assistant_with_vector_store()
if __name__ == "__main__":
# Define the root path for support files
root_path = os.path.dirname(os.path.dirname(__file__))
# Create an instance of ConfigureAssistant with the root path
configurator = ConfigureAssistant(root_path=root_path)
# Retrieve file paths
file_paths = configurator.get_file_paths(excluded_folders=[".git"])
# Preprocess and observe the files that will be uploaded
print("Files to be uploaded:")
for path in file_paths:
new_filename, should_upload = configurator.preprocess_file(path)
# Configure the assistant
configurator.configure()

View File

@ -0,0 +1,197 @@
from openai import OpenAI
import argparse
import logging
import os
import pandas as pd
import textwrap
class OpenAIResourceManager:
"""
A class to manage OpenAI resources such as assistants, vector stores, and files.
Provides methods to delete all resources of each type and display them in tables.
"""
def __init__(self, api_key: str):
"""
Initialize the OpenAIResourceManager with the given API key and configure logging.
:param api_key: The API key for OpenAI.
"""
self.client = OpenAI(api_key=api_key)
logging.basicConfig(level=logging.INFO)
def get_all_assistants(self):
"""
Retrieve all assistants associated with the OpenAI account.
:return: A list of assistants.
"""
try:
assistants = self.client.beta.assistants.list()
return assistants.data
except Exception as e:
logging.error(f"Failed to retrieve assistants: {e}")
raise
def get_all_vector_stores(self):
"""
Retrieve all vector stores associated with the OpenAI account.
:return: A list of vector stores.
"""
try:
vector_stores = self.client.beta.vector_stores.list()
return vector_stores.data
except Exception as e:
logging.error(f"Failed to retrieve vector stores: {e}")
raise
def get_all_files(self):
"""
Retrieve all files associated with the OpenAI account.
:return: A list of files.
"""
try:
files = self.client.files.list()
return files.data
except Exception as e:
logging.error(f"Failed to retrieve files: {e}")
raise
def delete_all_assistants(self):
"""
Delete all assistants associated with the OpenAI account.
"""
try:
assistants = self.get_all_assistants()
for assistant in assistants:
self.client.beta.assistants.delete(assistant.id)
logging.info(f"Deleted assistant: {assistant.id}")
except Exception as e:
logging.error(f"Failed to delete assistants: {e}")
raise
def delete_all_vector_stores(self):
"""
Delete all vector stores associated with the OpenAI account.
"""
try:
vector_stores = self.get_all_vector_stores()
for vector_store in vector_stores:
self.client.beta.vector_stores.delete(vector_store.id)
logging.info(f"Deleted vector store: {vector_store.id}")
except Exception as e:
logging.error(f"Failed to delete vector stores: {e}")
raise
def delete_all_files(self):
"""
Delete all files associated with the OpenAI account.
"""
try:
files = self.get_all_files()
for file in files:
self.client.files.delete(file.id)
logging.info(f"Deleted file: {file.id}")
except Exception as e:
logging.error(f"Failed to delete files: {e}")
raise
def delete_all_resources(self):
"""
Delete all assistants, vector stores, and files associated with the OpenAI account.
"""
self.delete_all_assistants()
self.delete_all_vector_stores()
self.delete_all_files()
def wrap_text(self, s, width=30):
"""
Wrap text to a specific width.
:param s: The string to wrap.
:param width: The maximum width of each line.
:return: The wrapped string.
"""
return "\n".join(textwrap.wrap(s, width))
def show_all_assistants(self):
"""
Display all assistants in a table.
"""
assistants = self.get_all_assistants()
assistant_data = [
{
k: self.wrap_text(str(v))
for k, v in assistant.dict().items()
}
for assistant in assistants
]
df = pd.DataFrame(assistant_data)
print("Assistants:")
print(df.to_markdown(index=False))
def show_all_vector_stores(self):
"""
Display all vector stores in a table.
"""
vector_stores = self.get_all_vector_stores()
vector_store_data = [
{k: self.wrap_text(str(v)) for k, v in vector_store.dict().items()}
for vector_store in vector_stores
]
df = pd.DataFrame(vector_store_data)
print("Vector Stores:")
print(df.to_markdown(index=False))
def show_all_files(self):
"""
Display all files in a table.
"""
files = self.get_all_files()
file_data = [
{k: self.wrap_text(str(v)) for k, v in file.dict().items()}
for file in files
]
df = pd.DataFrame(file_data)
print("Files:")
print(df.to_markdown(index=False))
def show_all_resources(self):
"""
Display all resources in a table.
"""
self.show_all_assistants()
print("")
self.show_all_vector_stores()
print("")
self.show_all_files()
def main():
"""
Main function that either shows or deletes resources based on the command line arguments.
"""
parser = argparse.ArgumentParser(
description="Displays or deletes all resources associated with the OpenAI account."
)
parser.add_argument(
"--delete", action="store_true", help="Flag to delete resources instead of showing them."
)
args = parser.parse_args()
# Get the OpenAI API key from the environment variables
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIResourceManager
manager = OpenAIResourceManager(api_key=api_key)
# Delete resources conditionally
if args.delete:
manager.delete_all_resources()
manager.show_all_resources()
if __name__ == "__main__":
main()

209
assistant/QueryAssistant.py Normal file
View File

@ -0,0 +1,209 @@
import argparse
import logging
import time
from openai import OpenAI
from openai import AssistantEventHandler
from typing_extensions import override
# Configure logging
logging.basicConfig(level=logging.INFO)
class QueryAssistant:
"""
A class to manage querying an OpenAI assistant.
Provides methods to create threads, send messages, and stream or fetch responses.
"""
def __init__(self, assistant_id: str):
"""
Initialize the QueryAssistant with the given assistant ID and OpenAI client.
:param assistant_id: The ID of the OpenAI assistant.
"""
self.client = OpenAI()
self.assistant_id = assistant_id
def create_thread(self):
"""
Create a new thread for the assistant.
:return: The created thread object.
"""
logging.info("Creating a new thread...")
thread = self.client.beta.threads.create()
logging.info(f"Thread created: {thread.id}")
return thread
def create_message(self, thread_id: str, content: str):
"""
Create a message in the specified thread with the given content.
:param thread_id: The ID of the thread.
:param content: The content of the message.
:return: The created message object.
"""
logging.info(f"Creating message in thread {thread_id}...")
message = self.client.beta.threads.messages.create(
thread_id=thread_id, role="user", content=content
)
logging.info("Message created")
return message
def stream_response(self, thread_id: str):
"""
Stream the response from the assistant for the specified thread.
:param thread_id: The ID of the thread.
"""
logging.info(f"Streaming response for thread {thread_id}...")
with self.client.beta.threads.runs.stream(
thread_id=thread_id,
assistant_id=self.assistant_id,
event_handler=self.EventHandler(),
) as stream:
stream.until_done()
logging.info("Response streaming completed")
def fetch_response(self, thread_id: str):
"""
Fetch the response from the assistant for the specified thread (non-streaming).
:param thread_id: The ID of the thread.
"""
logging.info(f"Fetching response for thread {thread_id}...")
run = self.client.beta.threads.runs.create_and_poll(
thread_id=thread_id, assistant_id=self.assistant_id
)
# Poll the run status with a delay to reduce the number of GET requests
while run.status != "completed" and run.status != "failed":
time.sleep(2) # Add a 2-second delay between checks
run = self.client.beta.threads.runs.retrieve(
thread_id=thread_id, run_id=run.id
)
logging.info(f"Run status: {run.status}")
if run.status == "completed":
messages = self.client.beta.threads.messages.list(thread_id=thread_id).data
for message in messages:
if message.role == "assistant":
for content in message.content:
if content.type == "text":
print(content.text.value)
else:
logging.error(f"Run failed with status: {run.status}")
if run.incomplete_details:
logging.error(f"Incomplete details: {run.incomplete_details}")
class EventHandler(AssistantEventHandler):
"""
A class to handle events from the assistant's response stream.
"""
@override
def on_text_created(self, text) -> None:
"""
Handle the event when text is created by the assistant.
:param text: The created text.
"""
logging.info("Text created by assistant")
print(f"\nassistant > ", end="", flush=True)
@override
def on_text_delta(self, delta, snapshot):
"""
Handle the event when there is a delta in the assistant's response.
:param delta: The response delta.
:param snapshot: The snapshot of the response.
"""
print(delta.value, end="", flush=True)
def on_tool_call_created(self, tool_call):
"""
Handle the event when a tool call is created by the assistant.
:param tool_call: The created tool call.
"""
logging.info(f"Tool call created: {tool_call.type}")
print(f"\nassistant > {tool_call.type}\n", flush=True)
def on_tool_call_delta(self, delta, snapshot):
"""
Handle the event when there is a delta in the assistant's tool call.
:param delta: The tool call delta.
:param snapshot: The snapshot of the tool call.
"""
if delta.type == "code_interpreter":
if delta.code_interpreter.input:
print(delta.code_interpreter.input, end="", flush=True)
if delta.code_interpreter.outputs:
print(f"\n\noutput >", flush=True)
for output in delta.code_interpreter.outputs:
if output.type == "logs":
print(f"\n{output.logs}", flush=True)
def main(query: str, assistant_id: str, context: str, use_streaming: bool):
"""
The main function to run the assistant query.
:param query: The query to ask the assistant.
:param assistant_id: The ID of the assistant.
:param context: The context to set before the query.
:param use_streaming: Boolean flag to determine if streaming should be used.
"""
assistant = QueryAssistant(assistant_id=assistant_id)
thread = assistant.create_thread()
# Merge the context and query into a single message
full_query = f"Context: {context}\nQuery: {query}"
# Print the full query
print("\n" + "=" * 100)
print(f"{full_query}")
print("=" * 100 + "\n")
# Send the message
assistant.create_message(thread_id=thread.id, content=full_query)
if use_streaming:
assistant.stream_response(thread_id=thread.id)
else:
assistant.fetch_response(thread_id=thread.id)
print("\n")
if __name__ == "__main__":
# Default query and context
DEFAULT_QUERY = "What are you capable of as an assistant?"
DEFAULT_CONTEXT = "Use your vector store to answer questions about the ArchiMajor Board. Take time to understand the context and introspect. If you don't know the answer simply respond with 'I don't know'. It is NEVER okay to return an empty response."
# Parse command line arguments
parser = argparse.ArgumentParser(description="Run an assistant query.")
parser.add_argument(
"--query",
type=str,
default=DEFAULT_QUERY,
help="The query to ask the assistant.",
)
parser.add_argument(
"--assistant_id",
type=str,
default="asst_W20wgEl0ZModBiMWzcgC0E3E",
help="The assistant ID to use.",
)
parser.add_argument(
"--context",
type=str,
default=DEFAULT_CONTEXT,
help="The context to set before the query.",
)
parser.add_argument(
"--use-streaming",
action="store_true",
help="Flag to determine if streaming should be used.",
)
# Run the main function with parsed arguments
args = parser.parse_args()
main(args.query, args.assistant_id, args.context, args.use_streaming)