diff --git a/assistant/ConfigureAssistant.py b/assistant/ConfigureAssistant.py new file mode 100644 index 0000000..7957536 --- /dev/null +++ b/assistant/ConfigureAssistant.py @@ -0,0 +1,187 @@ +import os +import io +from openai import OpenAI + + +class ConfigureAssistant: + """ + A class to configure an OpenAI assistant for aiding designers using the ArchiMajor project. + This class handles the creation of the assistant, the setup of vector stores, and the upload of files for context. + """ + + SUPPORTED_FORMATS = { + "c", + "cpp", + "css", + "docx", + "gif", + "html", + "java", + "jpeg", + "jpg", + "js", + "json", + "md", + "pdf", + "php", + # "png", # PNG is supported but not actually parsed + "pptx", + "py", + "rb", + # "tar", # TAR is supported but not actually parsed + "tex", + "ts", + "txt", + "webp", + "xlsx", + "xml", + "zip", + # "csv", # CSV is supported but not actually parsed so we're going to treat it as text + } + + def __init__(self, root_path: str): + """ + Initialize the ConfigureAssistant with the given root path and OpenAI client. + + :param root_path: The root directory path where support files are located. + """ + self.root_path = root_path + self.client = OpenAI() + self.assistant = None + self.vector_store = None + + def create_assistant(self): + """ + Create an OpenAI assistant with specific instructions and tools. + """ + AI_ASSISTANT_INSTRUCTIONS = """You are an assistant to aid designers who are either using the ArchiMajor board or designing a new board based off this one. They may ask questions about specific components or interconnects within this project. + + You have the following reference documents + 1. Schematic (PDF) + 2. Assortment of datasheets for chips used in design (PDF) + 3. Bill of Materials (CSV) + + These documents will provide you context to answer these questions with high level of confidence. Use your retrieval augmented generative (RAG) capabilities to leverage the files provided to you. + """ + # Create the assistant with the given instructions, model, temperature, and tools + self.assistant = self.client.beta.assistants.create( + name="A bot to answer questions about the ArchiMajor Board", + instructions=AI_ASSISTANT_INSTRUCTIONS, + model="gpt-4o", + temperature=0.1, + tools=[{"type": "file_search"}, ], # {"type": "code_interpreter"}], # Code interpreter doesn't seem to return a delta properly + ) + + def get_file_paths(self, excluded_folders=None): + """ + Retrieve all file paths within the root directory and its subdirectories. + + :param excluded_folders: A list of folders to exclude from the file path collection. + :return: A list of file paths. + """ + excluded_folders = excluded_folders or [] + file_paths = [] + # Walk through the directory tree and collect all file paths, excluding specific folders + for root, dirs, files in os.walk(self.root_path): + if any(folder in root for folder in excluded_folders): + continue + for file in files: + file_paths.append(os.path.join(root, file)) + return file_paths + + def preprocess_file(self, file_path): + """ + Preprocess files based on their type. + + :param file_path: The file path to preprocess. + :return: The filename to be used during upload and a flag indicating if the file should be uploaded. + """ + extension = file_path.split(".")[-1].lower() + + excluded_extensions = ["schdoc", "exe", "so", "dll", "outjob", "pcbdoc", "png", "tar"] + + # If the extension is not in supported formats and not in excluded extensions, spoof the file name + if extension not in self.SUPPORTED_FORMATS and extension not in excluded_extensions: + new_file_name = f"{os.path.basename(file_path)}.txt" + print(f"Spoofing unsupported file: {file_path} as {new_file_name}") + return new_file_name, True + elif extension in self.SUPPORTED_FORMATS: + return os.path.basename(file_path), True + else: + print(f"Skipping excluded file: {file_path}") + return os.path.basename(file_path), False + + def create_vector_store(self): + """ + Create a vector store for storing design documents. + """ + # Create a new vector store with the name "Design Documents" + self.vector_store = self.client.beta.vector_stores.create( + name="Design Documents" + ) + + def upload_files_to_vector_store(self, file_paths): + """ + Upload files to the vector store one by one and log the status of each upload. + + :param file_paths: A list of file paths to upload. + """ + for path in file_paths: + new_filename, should_upload = self.preprocess_file(path) + if not should_upload: + continue + try: + with open(path, "rb") as file_stream: + file_content = file_stream.read() + spoofed_file = io.BytesIO(file_content) + spoofed_file.name = new_filename # Spoof the filename + # Upload the file to the vector store + file_batch = ( + self.client.beta.vector_stores.file_batches.upload_and_poll( + vector_store_id=self.vector_store.id, files=[spoofed_file] + ) + ) + print(f"Successfully uploaded: {new_filename}") + except Exception as e: + print(f"Failed to upload: {new_filename} with error: {e}") + + def update_assistant_with_vector_store(self): + """ + Update the assistant to use the created vector store for file search. + """ + # Update the assistant with the new vector store's ID for file search capability + self.assistant = self.client.beta.assistants.update( + assistant_id=self.assistant.id, + tool_resources={ + "file_search": {"vector_store_ids": [self.vector_store.id]} + }, + ) + + def configure(self): + """ + Configure the assistant by creating it, retrieving file paths, creating a vector store, + uploading files to the vector store, and updating the assistant with the vector store. + """ + self.create_assistant() + file_paths = self.get_file_paths(excluded_folders=[".git"]) + self.create_vector_store() + self.upload_files_to_vector_store(file_paths) + self.update_assistant_with_vector_store() + + +if __name__ == "__main__": + # Define the root path for support files + root_path = os.path.dirname(os.path.dirname(__file__)) + # Create an instance of ConfigureAssistant with the root path + configurator = ConfigureAssistant(root_path=root_path) + + # Retrieve file paths + file_paths = configurator.get_file_paths(excluded_folders=[".git"]) + + # Preprocess and observe the files that will be uploaded + print("Files to be uploaded:") + for path in file_paths: + new_filename, should_upload = configurator.preprocess_file(path) + + # Configure the assistant + configurator.configure() diff --git a/assistant/OpenAIResourceManager.py b/assistant/OpenAIResourceManager.py new file mode 100644 index 0000000..452edfd --- /dev/null +++ b/assistant/OpenAIResourceManager.py @@ -0,0 +1,197 @@ +from openai import OpenAI +import argparse +import logging +import os +import pandas as pd +import textwrap + +class OpenAIResourceManager: + """ + A class to manage OpenAI resources such as assistants, vector stores, and files. + Provides methods to delete all resources of each type and display them in tables. + """ + + def __init__(self, api_key: str): + """ + Initialize the OpenAIResourceManager with the given API key and configure logging. + + :param api_key: The API key for OpenAI. + """ + self.client = OpenAI(api_key=api_key) + logging.basicConfig(level=logging.INFO) + + def get_all_assistants(self): + """ + Retrieve all assistants associated with the OpenAI account. + + :return: A list of assistants. + """ + try: + assistants = self.client.beta.assistants.list() + return assistants.data + except Exception as e: + logging.error(f"Failed to retrieve assistants: {e}") + raise + + def get_all_vector_stores(self): + """ + Retrieve all vector stores associated with the OpenAI account. + + :return: A list of vector stores. + """ + try: + vector_stores = self.client.beta.vector_stores.list() + return vector_stores.data + except Exception as e: + logging.error(f"Failed to retrieve vector stores: {e}") + raise + + def get_all_files(self): + """ + Retrieve all files associated with the OpenAI account. + + :return: A list of files. + """ + try: + files = self.client.files.list() + return files.data + except Exception as e: + logging.error(f"Failed to retrieve files: {e}") + raise + + def delete_all_assistants(self): + """ + Delete all assistants associated with the OpenAI account. + """ + try: + assistants = self.get_all_assistants() + for assistant in assistants: + self.client.beta.assistants.delete(assistant.id) + logging.info(f"Deleted assistant: {assistant.id}") + except Exception as e: + logging.error(f"Failed to delete assistants: {e}") + raise + + def delete_all_vector_stores(self): + """ + Delete all vector stores associated with the OpenAI account. + """ + try: + vector_stores = self.get_all_vector_stores() + for vector_store in vector_stores: + self.client.beta.vector_stores.delete(vector_store.id) + logging.info(f"Deleted vector store: {vector_store.id}") + except Exception as e: + logging.error(f"Failed to delete vector stores: {e}") + raise + + def delete_all_files(self): + """ + Delete all files associated with the OpenAI account. + """ + try: + files = self.get_all_files() + for file in files: + self.client.files.delete(file.id) + logging.info(f"Deleted file: {file.id}") + except Exception as e: + logging.error(f"Failed to delete files: {e}") + raise + + def delete_all_resources(self): + """ + Delete all assistants, vector stores, and files associated with the OpenAI account. + """ + self.delete_all_assistants() + self.delete_all_vector_stores() + self.delete_all_files() + + def wrap_text(self, s, width=30): + """ + Wrap text to a specific width. + + :param s: The string to wrap. + :param width: The maximum width of each line. + :return: The wrapped string. + """ + return "\n".join(textwrap.wrap(s, width)) + + def show_all_assistants(self): + """ + Display all assistants in a table. + """ + assistants = self.get_all_assistants() + assistant_data = [ + { + k: self.wrap_text(str(v)) + for k, v in assistant.dict().items() + } + for assistant in assistants + ] + df = pd.DataFrame(assistant_data) + print("Assistants:") + print(df.to_markdown(index=False)) + + def show_all_vector_stores(self): + """ + Display all vector stores in a table. + """ + vector_stores = self.get_all_vector_stores() + vector_store_data = [ + {k: self.wrap_text(str(v)) for k, v in vector_store.dict().items()} + for vector_store in vector_stores + ] + df = pd.DataFrame(vector_store_data) + print("Vector Stores:") + print(df.to_markdown(index=False)) + + def show_all_files(self): + """ + Display all files in a table. + """ + files = self.get_all_files() + file_data = [ + {k: self.wrap_text(str(v)) for k, v in file.dict().items()} + for file in files + ] + df = pd.DataFrame(file_data) + print("Files:") + print(df.to_markdown(index=False)) + + def show_all_resources(self): + """ + Display all resources in a table. + """ + self.show_all_assistants() + print("") + self.show_all_vector_stores() + print("") + self.show_all_files() + + + +def main(): + """ + Main function that either shows or deletes resources based on the command line arguments. + """ + parser = argparse.ArgumentParser( + description="Displays or deletes all resources associated with the OpenAI account." + ) + parser.add_argument( + "--delete", action="store_true", help="Flag to delete resources instead of showing them." + ) + args = parser.parse_args() + + # Get the OpenAI API key from the environment variables + api_key = os.getenv("OPENAI_API_KEY") + # Create an instance of the OpenAIResourceManager + manager = OpenAIResourceManager(api_key=api_key) + + # Delete resources conditionally + if args.delete: + manager.delete_all_resources() + + manager.show_all_resources() + +if __name__ == "__main__": + main() diff --git a/assistant/QueryAssistant.py b/assistant/QueryAssistant.py new file mode 100644 index 0000000..8b8a846 --- /dev/null +++ b/assistant/QueryAssistant.py @@ -0,0 +1,209 @@ +import argparse +import logging +import time +from openai import OpenAI +from openai import AssistantEventHandler +from typing_extensions import override + +# Configure logging +logging.basicConfig(level=logging.INFO) + + +class QueryAssistant: + """ + A class to manage querying an OpenAI assistant. + Provides methods to create threads, send messages, and stream or fetch responses. + """ + + def __init__(self, assistant_id: str): + """ + Initialize the QueryAssistant with the given assistant ID and OpenAI client. + + :param assistant_id: The ID of the OpenAI assistant. + """ + self.client = OpenAI() + self.assistant_id = assistant_id + + def create_thread(self): + """ + Create a new thread for the assistant. + + :return: The created thread object. + """ + logging.info("Creating a new thread...") + thread = self.client.beta.threads.create() + logging.info(f"Thread created: {thread.id}") + return thread + + def create_message(self, thread_id: str, content: str): + """ + Create a message in the specified thread with the given content. + + :param thread_id: The ID of the thread. + :param content: The content of the message. + :return: The created message object. + """ + logging.info(f"Creating message in thread {thread_id}...") + message = self.client.beta.threads.messages.create( + thread_id=thread_id, role="user", content=content + ) + logging.info("Message created") + return message + + def stream_response(self, thread_id: str): + """ + Stream the response from the assistant for the specified thread. + + :param thread_id: The ID of the thread. + """ + logging.info(f"Streaming response for thread {thread_id}...") + with self.client.beta.threads.runs.stream( + thread_id=thread_id, + assistant_id=self.assistant_id, + event_handler=self.EventHandler(), + ) as stream: + stream.until_done() + logging.info("Response streaming completed") + + def fetch_response(self, thread_id: str): + """ + Fetch the response from the assistant for the specified thread (non-streaming). + + :param thread_id: The ID of the thread. + """ + logging.info(f"Fetching response for thread {thread_id}...") + run = self.client.beta.threads.runs.create_and_poll( + thread_id=thread_id, assistant_id=self.assistant_id + ) + + # Poll the run status with a delay to reduce the number of GET requests + while run.status != "completed" and run.status != "failed": + time.sleep(2) # Add a 2-second delay between checks + run = self.client.beta.threads.runs.retrieve( + thread_id=thread_id, run_id=run.id + ) + logging.info(f"Run status: {run.status}") + + if run.status == "completed": + messages = self.client.beta.threads.messages.list(thread_id=thread_id).data + for message in messages: + if message.role == "assistant": + for content in message.content: + if content.type == "text": + print(content.text.value) + else: + logging.error(f"Run failed with status: {run.status}") + if run.incomplete_details: + logging.error(f"Incomplete details: {run.incomplete_details}") + + class EventHandler(AssistantEventHandler): + """ + A class to handle events from the assistant's response stream. + """ + + @override + def on_text_created(self, text) -> None: + """ + Handle the event when text is created by the assistant. + + :param text: The created text. + """ + logging.info("Text created by assistant") + print(f"\nassistant > ", end="", flush=True) + + @override + def on_text_delta(self, delta, snapshot): + """ + Handle the event when there is a delta in the assistant's response. + + :param delta: The response delta. + :param snapshot: The snapshot of the response. + """ + print(delta.value, end="", flush=True) + + def on_tool_call_created(self, tool_call): + """ + Handle the event when a tool call is created by the assistant. + + :param tool_call: The created tool call. + """ + logging.info(f"Tool call created: {tool_call.type}") + print(f"\nassistant > {tool_call.type}\n", flush=True) + + def on_tool_call_delta(self, delta, snapshot): + """ + Handle the event when there is a delta in the assistant's tool call. + + :param delta: The tool call delta. + :param snapshot: The snapshot of the tool call. + """ + if delta.type == "code_interpreter": + if delta.code_interpreter.input: + print(delta.code_interpreter.input, end="", flush=True) + if delta.code_interpreter.outputs: + print(f"\n\noutput >", flush=True) + for output in delta.code_interpreter.outputs: + if output.type == "logs": + print(f"\n{output.logs}", flush=True) + + +def main(query: str, assistant_id: str, context: str, use_streaming: bool): + """ + The main function to run the assistant query. + + :param query: The query to ask the assistant. + :param assistant_id: The ID of the assistant. + :param context: The context to set before the query. + :param use_streaming: Boolean flag to determine if streaming should be used. + """ + assistant = QueryAssistant(assistant_id=assistant_id) + thread = assistant.create_thread() + # Merge the context and query into a single message + full_query = f"Context: {context}\nQuery: {query}" + # Print the full query + print("\n" + "=" * 100) + print(f"{full_query}") + print("=" * 100 + "\n") + # Send the message + assistant.create_message(thread_id=thread.id, content=full_query) + if use_streaming: + assistant.stream_response(thread_id=thread.id) + else: + assistant.fetch_response(thread_id=thread.id) + print("\n") + + +if __name__ == "__main__": + # Default query and context + DEFAULT_QUERY = "What are you capable of as an assistant?" + DEFAULT_CONTEXT = "Use your vector store to answer questions about the ArchiMajor Board. Take time to understand the context and introspect. If you don't know the answer simply respond with 'I don't know'. It is NEVER okay to return an empty response." + + # Parse command line arguments + parser = argparse.ArgumentParser(description="Run an assistant query.") + parser.add_argument( + "--query", + type=str, + default=DEFAULT_QUERY, + help="The query to ask the assistant.", + ) + parser.add_argument( + "--assistant_id", + type=str, + default="asst_W20wgEl0ZModBiMWzcgC0E3E", + help="The assistant ID to use.", + ) + parser.add_argument( + "--context", + type=str, + default=DEFAULT_CONTEXT, + help="The context to set before the query.", + ) + parser.add_argument( + "--use-streaming", + action="store_true", + help="Flag to determine if streaming should be used.", + ) + + # Run the main function with parsed arguments + args = parser.parse_args() + main(args.query, args.assistant_id, args.context, args.use_streaming)