Mothbox/AI/Mothbot/Mothbot_ID.py
2026-02-17 13:39:42 -05:00

1115 lines
40 KiB
Python

#!/usr/bin/env python3
"""
MOTHBOT_ID
This script looks for mothbox image data and detection data, pairs them together, finds the region of interest in the image of the detection
feeds this ROI to pyBIOCLIP to try to get an ID
the pybioclip also takes in GBIF species lists
Get list of taxa from just specific region in GBIF
ex:
country = 'PA' #2 letter country code https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2 "Panama"==PA
classKey = '216' # just insects i think
Example search in GBIF
https://www.gbif.org/occurrence/taxonomy?country=PA&taxon_key=212
Usage:
python Mothbox_ID.py
Arguments:
-h, --help Show this help message and exit
"""
import ssl
ssl._create_default_https_context = ssl._create_unverified_context #needed for some macs to automatically download files associated with some of the libraries
print("Loading all the ID libraries...")
import polars as pl
import os
import sys
import json
import argparse
import re
import io
from pathlib import Path
import numpy as np
from PIL import Image
from PIL import ImageFile
import torch
from datetime import datetime
ImageFile.LOAD_TRUNCATED_IMAGES = (
True # makes ok for use images that are messed up slightly
)
from bioclip import TreeOfLifeClassifier, Rank, CustomLabelsClassifier
from bioclip.predict import create_classification_dict
import importlib.metadata
VERSION = "pybioclip_"+importlib.metadata.version("pybioclip")
print("ID model: "+VERSION)
# ~~~~Variables to Change~~~~~~~
INPUT_PATH = (
r"D:\MothboxData_Hubert\data\Panama\Hoya_119m_bothDeer_2025-01-26\2025-01-26" # raw string
)
SPECIES_LIST = r"../SpeciesList_CountryPanamaCostaRica_TaxaInsecta_doi.org10.15468dl.6nxkw6.csv" # downloaded from GBIF for example just insects in panama: https://www.gbif.org/occurrence/taxonomy?country=PA&taxon_key=212
""" KINGDOM = 0
PHYLUM = 1
CLASS = 2
ORDER = 3
FAMILY = 4
GENUS = 5
SPECIES = 6"""
TAXONOMIC_RANK_FILTER_num = 3 #!!! change this number to change the taxonomic rank we filter with. IE filter to order with "3" or filter to genus with "5"
# you can See if a json file has an existing ID by looking at identifier_bot: pybioclip
OVERWRITE_EXISTING_IDs = True #True
#you probably always want these below as true
ID_HUMANDETECTIONS = True
ID_BOTDETECTIONS = True
# ~~~~Other Global Variables~~~~~~~
TAXA_COLS = ["kingdom", "phylum", "class", "order", "family", "genus", "species"]
TAXONOMIC_RANK_FILTER = Rank.ORDER
TOL_TAXONOMIC_RANK = "species" # Change this to "species" to target just the species in your CSV # Note i think this is actually just always needs to be set for SPECIES for this exampple
DOMAIN = "Eukarya" # basically our "creature" tag? figure we will never see a prokaryote on the mothbox # Also i think GBIF has a "Biota" category that is a fancier version of "creature" or "life"
taxa_path = SPECIES_LIST
# Paths to save filtered list of embeddings/labels
image_embeddings_path = INPUT_PATH + "/image_embeddings.npy"
embedding_labels_path = INPUT_PATH + "/embedding_labels.json"
#print(torch.cuda.is_available())
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
DOI= ""
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_path",
required=False,
default=INPUT_PATH,
help="path to images for classification (ex: datasets/test_images/data)",
)
parser.add_argument(
"--TOLrank",
default=TOL_TAXONOMIC_RANK,
#help="rank to which to classify; must be column in --taxa-csv (default: {TAXONOMIC_RANK})", #this always needs to just be left at species i think
)
parser.add_argument(
"--rank",
default=TAXONOMIC_RANK_FILTER_num,
help=f"rank to which to classify; must be column in --taxa-csv (default: {TAXONOMIC_RANK_FILTER_num})",
)
parser.add_argument(
"--flag-det-errors",
default=True,
action=argparse.BooleanOptionalAction,
help="whether to flag detection errors like holes and smudges (default: --flag-det-errors)",
)
parser.add_argument(
"--taxa_csv",
default=SPECIES_LIST,
help=f"CSV with taxonomic labels to use for CustomClassifier (default: {SPECIES_LIST})",
)
parser.add_argument(
"--taxa_cols",
default=TAXA_COLS,
help=f"taxonomic columns in taxa CSV to load (default: {TAXA_COLS})",
)
parser.add_argument(
"--device",
required=False,
choices=["cpu", "cuda"],
default=DEVICE,
help="device on which to run pybioblip ('cpu' or 'cuda', default: what your comp detects)",
)
parser.add_argument(
"--ID_Hum",
required=False,
default=ID_HUMANDETECTIONS,
help="ID detections made by humans?",
)
parser.add_argument(
"--ID_Bot",
required=False,
default=ID_BOTDETECTIONS,
help="ID detections made by robots?",
)
parser.add_argument(
"--overwrite_prev_bot_ID",
default=OVERWRITE_EXISTING_IDs,
required=False,
help="If IDs already exist, should we overwrite?",
)
return parser.parse_args()
# FUNCTIONS ~~~~~~~~~~~~~
def current_timestamp() -> str:
"""
Returns the current timestamp in format:
YYYY-MM-DD__HH_MM_SS_(±HHMM)
"""
now = datetime.now().astimezone() # local time with UTC offset
return now.strftime("%Y-%m-%d__%H_%M_%S_(%z)")
def load_taxon_keys(taxa_path, taxa_cols=None, taxon_rank="order", flag_det_errors=True):
"""
Read taxa_path (path, bytes, or file-like) robustly handling encoding issues
and return a set of unique, lowercased values for taxon_rank.
Returns:
set(str): lowercased unique taxon_rank values
"""
print(f"Reading {taxa_path!s}, extracting {taxon_rank} values.")
# encodings to try in order (utf-8 first, then common windows/latin fallbacks)
encodings = ("utf-8", "utf-8-sig", "utf-16", "cp1252", "latin-1")
raw = None
text = None
# Accept bytes/bytearray directly
if isinstance(taxa_path, (bytes, bytearray)):
raw = bytes(taxa_path)
# Accept an already-open file-like object
elif hasattr(taxa_path, "read"):
try:
# try reading bytes first (some file-like objects are in binary mode)
raw = taxa_path.read()
except Exception:
# if that fails, try text-mode read
taxa_path.seek(0)
text = taxa_path.read()
# Otherwise treat it as a filesystem path-like
else:
p = Path(taxa_path)
if not p.exists():
raise FileNotFoundError(f"taxa_path not found: {taxa_path}")
with open(p, "rb") as f:
raw = f.read()
# If we already have text (not bytes), use it
if text is None:
# If raw is already str (unlikely) use it
if isinstance(raw, str):
text = raw
else:
# Try the encodings in order
decoded = None
for enc in encodings:
try:
decoded = raw.decode(enc)
# quick sanity check: if decoding produced something non-empty, accept it
if decoded is not None:
text = decoded
break
except Exception:
continue
# Last-resort: decode with replacement so we never raise UnicodeDecodeError
if text is None:
text = raw.decode("utf-8", errors="replace")
# Try to load into Polars; prefer tab-separated but fall back to automatic parsing.
try:
df = pl.read_csv(io.StringIO(text), separator="\t")
except Exception:
try:
df = pl.read_csv(io.StringIO(text)) # let polars infer delimiter
except Exception:
# final fallback: use pandas to parse then convert to polars
import pandas as pd
df_pd = pd.read_csv(io.StringIO(text), sep="\t")
df = pl.from_pandas(df_pd)
# If user provided a taxa_cols mapping/dictionary, prefer it for column lookup
chosen_col = None
if taxa_cols and isinstance(taxa_cols, dict):
chosen_col = taxa_cols.get(taxon_rank, None)
# If chosen_col not set or not present, do case-insensitive matching against dataframe columns
if not chosen_col or chosen_col not in df.columns:
cols_lower = {c.lower(): c for c in df.columns}
if taxon_rank.lower() in cols_lower:
chosen_col = cols_lower[taxon_rank.lower()]
elif chosen_col and chosen_col in df.columns:
# keep chosen_col if it exists
pass
else:
raise KeyError(f"taxon_rank '{taxon_rank}' not found in file columns: {list(df.columns)}")
# Extract unique non-null values and normalize to lowercase strings
try:
vals = df[chosen_col].drop_nulls().unique().to_list()
target_values = {str(v).lower() for v in vals if v is not None and str(v).strip() != ""}
except Exception:
# conservative fallback: iterate rows if the vectorized route fails
vals = []
for rec in df.select(chosen_col).to_dicts():
v = rec.get(chosen_col)
if v is not None:
vals.append(v)
target_values = {str(v).lower() for v in vals if str(v).strip() != ""}
print("Found", len(target_values), taxon_rank, "values.")
return target_values
def load_taxon_keys_old(taxa_path, taxa_cols, taxon_rank="order", flag_det_errors=True):
print("Reading", taxa_path, "extracting", taxon_rank, "values.")
df = pl.read_csv(taxa_path, separator='\t') # Changed separator to '\t' for tab-delimited
target_values = set(
pl.Series(df.select(taxon_rank).drop_nulls())
.str.to_lowercase()
.unique()
.to_list()
)
print("Found", len(target_values), taxon_rank, "values: ")
#print(target_values)
return target_values
def load_taxon_keys_comma(taxa_path, taxa_cols, taxon_rank="order", flag_det_errors=True):
"""
Loads taxon keys from a Comma-delimited CSV file into a list.
Args:
taxa_path: String. Path to the taxa CSV file.
taxa_cols: List of strings. Taxonomic columns in taxa CSV to load (default: ["kingdom", "phylum", "class", "order", "family", "genus", "species"]).
taxon_rank: String. Taxonomic rank to which to classify images (must be present as column in the taxa csv at file_path). Default: "order".
flag_det_errors: Boolean. Whether to flag holes and smudges blanks (adds "hole" and "background" and "blank" to taxon_keys). Default: True.
Returns:
taxon_keys: List. A list of taxon keys to feed to the CustomClassifier for bioCLIP classification.
"""
print("Reading", taxa_path, "extracting", taxon_rank, "values.")
df = pl.read_csv(taxa_path)
target_values = set(
pl.Series(df.select(taxon_rank).drop_nulls())
.str.to_lowercase()
.unique()
.to_list()
)
print("Found", len(target_values), taxon_rank, "values: ")
#print(target_values)
return target_values
#We don't use this function much anymore
def process_files_in_directory(data_path, classifier, taxon_rank="order"):
"""
Processes files within a specified subdirectory.
Args:
data_path: String. The path to the directory containing files.
classifier: CustomLabelsClassifier object from TAXA_KEYS_CSV.
taxon_rank: String. Taxonomic rank to which to classify images (must be present as column in the taxa csv at file_path). Default: "order".
"""
# Example: Print all file names in the subdirectory
for file in os.listdir(data_path):
file_path = os.path.join(data_path, file)
if os.path.isfile(file_path):
print(f"File: {file_path}")
img_list = [f for f in os.listdir(data_path) if f.endswith(".jpg") or f.endswith(".JPG") or f.endswith(".PNG") or f.endswith(".png")]
if not img_list:
# No imgs were found in base level
sys.exit("No .jpg images found in the data path: " + data_path)
else:
predictions = {}
# Analyze the files
print(f"Found {len(img_list)} .jpg images. \n Getting predictions...")
i = 1
for file in img_list:
filename = os.path.splitext(file)[0]
# print(filename)
data = os.path.join(data_path, file)
print(f"\n img # {str(i)} out of {str(len(img_list))}")
i = i + 1
# Run inference
results = classifier.predict(data)
classifier.predict_classifications_from_list() # def predict_classifications_from_list(img: Union[PIL.Image.Image, str], cls_ary: List[str], device: Union[str, torch.device] = 'cpu') -> dict[str, float]:
sorted_results = sorted(results, key=lambda x: x["score"], reverse=True)
# Get the highest scoring result
winner = sorted_results[0]
pred = winner["classification"]
# Print the winner
print(
filename
+ f" This is the winner: {pred} with a score of {winner['score']}"
)
key = f"data/{file}"
predstring = str(pred).strip().lower()
print(predstring)
if predstring in ["hole", "background", "wall", "floor", "blank", "sky"]:
predictions[key] = f"abiotic_{pred}"
else:
predictions[key] = taxon_rank + "_" + pred
return predictions
def find_date_folders(directory):
"""
Recursively searches through a directory and its subdirectories for folders
with names in the YYYY-MM-DD format.
Args:
directory: The directory to search.
Returns:
A list of paths to the found folders, including the root directory if it matches the date format.
"""
#date_regex = r"^\d{4}-\d{2}-\d{2}$"
date_regex = re.compile(r"^(?:\d{4}-\d{2}-\d{2}|[A-Za-z0-9]+_\d{4}-\d{2}-\d{2})$")
folders = []
# Check if the root directory itself matches the date format
if re.match(date_regex, os.path.basename(directory)):
folders.append(directory)
for root, dirs, files in os.walk(directory):
for dir_name in dirs:
if re.match(date_regex, dir_name):
folders.append(os.path.join(root, dir_name))
return folders
def find_detection_matches(folder_path):
"""Finds matching triplets of .jpg, botdetection.json, and potentially a humandetection .json files in a given folder.
Args:
folder_path: The path to the folder to search.
Returns:
two lists of tuples, where each tuple contains the paths to a matching .jpg, botdetection.json,
or matching jpg and humandetection.json file.
"""
# ALL jpg files in the folder
jpg_files = [
os.path.join(folder_path, f)
for f in os.listdir(folder_path)
if f.endswith(".jpg")
]
# List of ALL json files in the folder
json_files = [
os.path.join(folder_path, f)
for f in os.listdir(folder_path)
if f.endswith(".json")
]
hu_detection_matches_list = []
bot_detection_matches_list = []
for jpg_file in jpg_files:
# target human file
humanD_json_file = jpg_file.replace(".jpg", ".json")
botD_json_file = jpg_file.replace(".jpg", "_botdetection.json")
if humanD_json_file in json_files:
hu_detection_matches_list.append((jpg_file,humanD_json_file))
if botD_json_file in json_files:
bot_detection_matches_list.append((jpg_file,botD_json_file))
return hu_detection_matches_list, bot_detection_matches_list
def calculate_rotation_angle(points):
"""Calculates the rotation angle of the rectangle."""
# Implement a suitable algorithm to calculate the angle based on the points
# For example, using the slope of the first two points
p1, p2 = points[:2]
angle = np.arctan2(p2[1] - p1[1], p2[0] - p1[0]) * 180 / np.pi
return angle
def rotate_image_to_vertical(image, angle):
"""Rotates an image to vertical orientation based on the given angle."""
image = image.rotate(-angle, expand=True)
return image
"""
#don't use anymore
def crop_rect(
img, rect, interpolation=cv2.INTER_LINEAR
): # cv2.INTER_LANCZOS4 cv2.INTER_LINEAR cv2.INTER_CUBIC
# get the parameter of the small rectangle
center, size, angle = rect[0], rect[1], rect[2]
center, size = tuple(map(int, center)), tuple(map(int, size))
# get row and col num in img
height, width = img.shape[0], img.shape[1]
# calculate the rotation matrix
M = cv2.getRotationMatrix2D(center, angle, 1)
# rotate the original image
img_rot = cv2.warpAffine(img, M, (width, height), flags=interpolation)
# now rotated rectangle becomes vertical, and we crop it
img_crop = cv2.getRectSubPix(img_rot, size, center)
return img_crop, img_rot
"""
def update_main_list(main_list, new_items):
"""Updates the main list with new items, avoiding duplicates.
Args:
main_list: The main list to update.
new_items: A list of new items to add.
Returns:
The updated main list.
"""
# Create a set of existing items for efficient lookup
existing_items = set(main_list)
# Add new items to the main list if they don't exist
for item in new_items:
if item not in existing_items:
main_list.append(item)
return main_list
# not sure we use this anymore
def rotate_cropped(img, points):
# print("shape of cnt: {}".format(points.shape))
rect = cv2.minAreaRect(points)
# print("rect: {}".format(rect))
# the order of the box points: bottom left, top left, top right,
# bottom right
box = cv2.boxPoints(rect)
box = np.int0(box)
# print("bounding box: {}".format(box))
cv2.drawContours(img, [box], 0, (0, 0, 255), 2)
# get width and height of the detected rectangle
width = int(rect[1][0])
height = int(rect[1][1])
src_pts = box.astype("float32")
# coordinate of the points in box points after the rectangle has been
# straightened
dst_pts = np.array(
[[0, height - 1], [0, 0], [width - 1, 0], [width - 1, height - 1]],
dtype="float32",
)
# the perspective transformation matrix
M = cv2.getPerspectiveTransform(src_pts, dst_pts)
# directly warp the rotated rectangle to get the straightened rectangle
warped = cv2.warpPerspective(img, M, (width, height))
return warped
def get_rotated_rect_coordinates(json_file):
"""Reads rotated rectangle coordinates from a JSON file and returns them."""
with open(json_file, "r") as f:
data = json.load(f)
coordinates_list = []
for shape in data["shapes"]:
if shape["shape_type"] == "rotation":
points = shape["points"]
x, y, w, h, angle = extract_rectangle_coordinates(points)
coordinates_list.append((x, y, w, h, angle))
return coordinates_list
def get_rotated_rect_raw_coordinates(json_file):
"""Reads rotated rectangle coordinates from a JSON file and returns them."""
pre_ided = False # variable to detect if this has already been IDed
with open(json_file, "r") as f:
data = json.load(f)
coordinates_list = []
pre_ided_list = []
patch_list = []
for shape in data["shapes"]:
if shape["shape_type"] == "rotation":
patch=shape["patch_path"]
patch_list.append(patch)
points = shape["points"]
# x, y, w, h, angle = extract_rectangle_coordinates(points)
coordinates_list.append(points)
if "identifier_bot" in shape:
if shape["identifier_bot"] != "": # detect if there's been an identification (if so it would say something like pybioclip)
pre_ided = True
#print("it was previously IDed")
pre_ided_list.append(pre_ided)
return coordinates_list, pre_ided_list, patch_list
def extract_rectangle_coordinates(points):
"""Extracts rectangle coordinates from a list of points."""
# Assuming points are in clockwise order (adjust if needed)
min_x = min(point[0] for point in points)
min_y = min(point[1] for point in points)
max_x = max(point[0] for point in points)
max_y = max(point[1] for point in points)
width = max_x - min_x
height = max_y - min_y
angle = calculate_rotation_angle(points)
return min_x, min_y, width, height, angle
def calculate_rotation_angle(points):
"""Calculates the rotation angle of the rectangle."""
# Implement a suitable algorithm to calculate the angle based on the points
# For example, using the slope of the first two points
p1, p2 = points[:2]
angle = np.arctan2(p2[1] - p1[1], p2[0] - p1[0]) * 180 / np.pi
return angle
#not sure we use this
def rotate_image_around_center(image, angle):
"""Rotates an image around its center by the specified angle."""
cv_image = np.array(image)
height, width, _ = cv_image.shape
center = (width // 2, height // 2)
M_translate = np.float32([[1, 0, -center[0]], [0, 1, -center[1]]])
translated_image = cv2.warpAffine(cv_image, M_translate, (width, height))
rotated_image = cv2.rotate(translated_image, angle=angle)
M_translate_back = np.float32([[1, 0, center[0]], [0, 1, center[1]]])
final_image = cv2.warpAffine(rotated_image, M_translate_back, (width, height))
return final_image
def crop_image(image, x, y, w, h):
"""Crops an image based on the specified coordinates."""
cropped_image = image.crop((x, y, x + w, y + h))
return cropped_image
#not sure this is being used
def warp_rotation(img, points):
# cnt = np.array(points)
cnt = np.array([[int(x), int(y)] for x, y in points])
# print("shape of cnt: {}".format(cnt.shape))
rect = cv2.minAreaRect(cnt)
# print("rect: {}".format(rect))
# the order of the box points: bottom left, top left, top right,
# bottom right
box = cv2.boxPoints(rect)
box = np.int0(box)
# print("bounding box: {}".format(box))
# cv2.drawContours(img, [box], 0, (0, 0, 255), 2)
# get width and height of the detected rectangle
width = int(rect[1][0])
height = int(rect[1][1])
src_pts = box.astype("float32")
# coordinate of the points in box points after the rectangle has been
# straightened
dst_pts = np.array(
[[0, height - 1], [0, 0], [width - 1, 0], [width - 1, height - 1]],
dtype="float32",
)
# the perspective transformation matrix
M = cv2.getPerspectiveTransform(src_pts, dst_pts)
# directly warp the rotated rectangle to get the straightened rectangle
warped = cv2.warpPerspective(img, M, (width, height))
return warped
def get_bioclip_prediction(img_path, classifier):
# Run inference
results = classifier.predict(img_path, rank=TAXONOMIC_RANK_FILTER)
classifier.predict_classifications_from_list() # def predict_classifications_from_list(img: Union[PIL.Image.Image, str], cls_ary: List[str], device: Union[str, torch.device] = 'cpu') -> dict[str, float]:
sorted_results = sorted(results, key=lambda x: x["score"], reverse=True)
# Get the highest scoring result
winner = sorted_results[0]
pred = winner["classification"]
# Print the winner
print(f" This is the winner: {pred} with a score of {winner['score']}")
return pred
def get_bioclip_prediction_imgpath(img, classifier):
# create a PIL image array
images = [img]
winner = ""
winnerprob = ""
img_embeddings = classifier.create_image_features(images)
for probs in classifier.create_probabilities(img_embeddings, classifier.txt_embeddings):
topk = probs.topk(k=5)
index = 0
for pred in classifier.format_grouped_probs(
"", probs, rank=TAXONOMIC_RANK_FILTER, min_prob=1e-9, k=5
): # TODO make it so tags get saved for all ranks, and specify deepest rank? #this shows the depth of the order it searches to
#print(pred)
if index == 0:
kingdom = pred["kingdom"]
#print(str(TAXONOMIC_RANK_FILTER.get_label()))
winner = pred[
str(TAXONOMIC_RANK_FILTER.get_label())
] # get the correct ID based on the deepest order we are searching
winnerprob = pred["score"]
winningdict = pred
index = index + 1
# Print the winner
print(f" This is the winner: {winner} with a score of {winnerprob}")
return winner, winnerprob, winningdict
def get_bioclip_prediction_PILimg(img, classifier):
# create a PIL image array
images = [img]
winner = ""
winnerprob = ""
img_embeddings = classifier.create_image_features(images)
for probs in classifier.create_probabilities(img_embeddings, classifier.txt_embeddings):
topk = probs.topk(k=5)
index = 0
for pred in classifier.format_grouped_probs(
"", probs, rank=TAXONOMIC_RANK_FILTER, min_prob=1e-9, k=5
): # TODO make it so tags get saved for all ranks, and specify deepest rank? #this shows the depth of the order it searches to
#print(pred)
if index == 0:
kingdom = pred["kingdom"]
#print(str(TAXONOMIC_RANK_FILTER.get_label()))
winner = pred[
str(TAXONOMIC_RANK_FILTER.get_label())
] # get the correct ID based on the deepest order we are searching
winnerprob = pred["score"]
winningdict = pred
index = index + 1
# Print the winner
print(f" This is the winner: {winner} with a score of {winnerprob}")
return winner, winnerprob, winningdict
def update_json_labels_and_scores(json_path, index, pred, conf, winningdict):
"""Updates the "label" and "score" entries for a specific shape in a JSON file.
Args:
json_path: The path to the JSON file.
index: The index of the shape to update (0-based).
pred: The new label value.
conf: The new score value.
"""
# TODO add winningdict here correctly
with open(json_path, "r") as f:
data = json.load(f)
if 0 <= index < len(data["shapes"]):
shape = data["shapes"][index]
# do stuff here now
shape["identifier_bot"] = VERSION
shape["species_list"]= DOI
shape["timestamp_ID_bot"]=current_timestamp()
shape["confidence_ID"] = conf
predstring = str(pred).strip().lower()
if predstring in ["hole", "background", "wall", "floor", "blank", "sky"]:
shape["label"] = "ERROR_" + pred
else:
shape["label"] = str(TAXONOMIC_RANK_FILTER).replace("Rank.", "") + "_" + pred
#shape["description"] = (
# VERSION # Put what Robot did the ID, put "" for human / ground_truth
#)
# Add taxonomic ranks only if they exist in the winningdict
for rank in [
"kingdom",
"phylum",
"class",
"order",
"family",
"genus",
"species",
]:
if rank in winningdict:
if winningdict[rank].strip().lower() in ["hole", "background", "wall", "floor", "blank", "sky"]:
shape[rank] = "ERROR_" + winningdict[rank]
else:
shape[rank] = winningdict[rank]
with open(json_path, "w") as f:
json.dump(data, f, indent=4)
def add_metadata_to_json(json_path, metadata_path):
"""Adds metadata from a separate JSON file to an existing JSON file.
Args:
json_path: The path to the JSON file to modify.
metadata_path: The path to the JSON file containing the metadata to add.
"""
with open(json_path, "r") as f:
data = json.load(f)
with open(metadata_path, "r") as f:
metadata = json.load(f)
# Check if the 'metadata' key exists in the data
if "metadata" not in data:
data["metadata"] = [] # Create an empty 'metadata' list if it doesn't exist
# Add metadata to the existing 'metadata' list, avoiding duplicates
for key, value in metadata.items():
if not any(item.get(key) for item in data["metadata"]):
data["metadata"].append({key: value})
with open(json_path, "w") as f:
json.dump(data, f, indent=4)
print(f"Metadata added to {json_path}")
# Patch get_txt_names to always use UTF-8
def fixed_get_txt_names(self):
txt_names_json = self.get_cached_datafile("embeddings/txt_emb_species.json")
with open(txt_names_json, encoding="utf-8") as fd:
return json.load(fd)
def ID_matched_img_json_pairs(
hu_matched_img_json_pairs,bot_matched_img_json_pairs, taxa_path, taxa_cols, taxon_rank, device, flag_the_det_errors):
# derive cache filename
cache_path = os.path.splitext(taxa_path)[0] + ".pt"
if os.path.exists(cache_path):
print(f"Loading cached embeddings from {cache_path}")
cache = torch.load(cache_path, map_location=device)
TreeOfLifeClassifier.get_txt_names = fixed_get_txt_names # weird patch
classifier = TreeOfLifeClassifier(device=device) # still need classifier structure
classifier.txt_names = cache["txt_names"]
classifier.txt_embeddings = cache["txt_embeddings"].to(device)
print("TOL: Loaded number of labels:", len(classifier.txt_names))
print("TOL: Loaded image embeddings shape:", classifier.txt_embeddings.shape)
#return classifier
# ----------------------
# No cache → build fresh
# ----------------------
else:
# load up the Pybioclip stuff
taxon_keys_list = load_taxon_keys(
taxa_path=taxa_path,
taxa_cols=taxa_cols,
taxon_rank=taxon_rank.lower(),
flag_det_errors=flag_the_det_errors,
)
target_values = taxon_keys_list
print("Loading TOL classifier")
TreeOfLifeClassifier.get_txt_names = fixed_get_txt_names # weird patch to make it not give an undefined character error!
classifier = TreeOfLifeClassifier(device=DEVICE) #it used to crash here
print("TOL: number of labels:", len(classifier.txt_names))
print("TOL: image embeddings shape:", classifier.txt_embeddings.shape)
print("Finding embeddings matching the targets.")
found_items = []
for i, txt_name in enumerate(classifier.txt_names):
name_dict = create_classification_dict(txt_name, Rank.SPECIES)
if name_dict[taxon_rank].lower() in target_values:
found_items.append((i, txt_name))
print("Found", len(found_items), "embeddings matching the", taxon_rank, "values")
print("Building the image embedding tensor")
txt_feature_ary = []
new_txt_names = []
for i, txt_name in found_items:
txt_feature_ary.append(classifier.txt_embeddings[:, i])
new_txt_names.append(txt_name)
print("Creating embeddings for custom labels")
custom_labels = ["hole", "background", "wall", "floor", "blank", "sky"]
clc = CustomLabelsClassifier(custom_labels, device=DEVICE)
for i, label in enumerate(custom_labels):
txt_feature_ary.append(clc.txt_embeddings[:, i])
new_txt_names.append([[label, label, label, label, label, "", label], label])
#HERE TO FIX
classifier.txt_names = new_txt_names
classifier.txt_embeddings = torch.stack(txt_feature_ary, dim=1)
print("TOL: Updated number of labels:", len(classifier.txt_names))
print("TOL: Updated image embeddings shape:", classifier.txt_embeddings.shape)
# ----------------------
# Save cache
# ----------------------
print(f"Saving embeddings cache to {cache_path}")
torch.save({
"txt_names": classifier.txt_names,
"txt_embeddings": classifier.txt_embeddings.cpu(), # save portable CPU tensors
}, cache_path)
#Process Human Detections
print("processing Human Detections.........")
patch_paths_hu = [] # define this once before your loop
json_paths_hu = []
idx_paths_hu = []
if(ID_HUMANDETECTIONS):
# Next process each pair and generate temporary files for the ROI of each detection in each image
# Iterate through image-JSON pairs
index = 0
numofpairs = len(hu_matched_img_json_pairs)
for pair in hu_matched_img_json_pairs:
# Load JSON file and extract rotated rectangle coordinates for each detection
image_path, json_path = pair[:2] # Always extract the first two elements
coordinates_of_detections_list, was_pre_ided_list,thepatch_list = (
get_rotated_rect_raw_coordinates(json_path)
)
index = index + 1
print(
str(index)
+ "/"
+ str(numofpairs)
+ " | "
+ str(len(coordinates_of_detections_list)),
"HUMAN detections in " + json_path,
)
if coordinates_of_detections_list:
for idx, coordinates in enumerate(coordinates_of_detections_list):
# print(coordinates)
if was_pre_ided_list[idx] and OVERWRITE_EXISTING_IDs==False: # skip processing if IDed
continue
patchfullpath=os.path.dirname(image_path)+"/"+ thepatch_list[idx]
patchPIL=Image.open(patchfullpath)
pred, conf, winningdict = get_bioclip_prediction_imgpath(patchPIL,classifier)
# next we can make a copy of the detection json with IDs / or figure out how to ADD the IDs
update_json_labels_and_scores(json_path, idx, pred, conf, winningdict)
#add path to list of patches for perceptual processing
patch_paths_hu.append(patchfullpath)
json_paths_hu.append(json_path)
idx_paths_hu.append(idx)
#Process BOT Detections
print("processing BOT Detections.........")
patch_paths_bots = [] # define this once before your loop
json_paths_bots = []
idx_paths_bots = []
if(ID_BOTDETECTIONS):
# Next process each pair and generate temporary files for the ROI of each detection in each image
# Iterate through image-JSON pairs
index = 0
numofpairs = len(bot_matched_img_json_pairs)
for pair in bot_matched_img_json_pairs:
# Load JSON file and extract rotated rectangle coordinates for each detection
image_path, json_path = pair[:2] # Always extract the first two elements
coordinates_of_detections_list, was_pre_ided_list,thepatch_list = (
get_rotated_rect_raw_coordinates(json_path)
)
index = index + 1
print(
str(index)
+ "/"
+ str(numofpairs)
+ " | "
+ str(len(coordinates_of_detections_list)),
"BOT detections in " + json_path,
)
if coordinates_of_detections_list:
for idx, coordinates in enumerate(coordinates_of_detections_list):
# print(coordinates)
if was_pre_ided_list[idx] and OVERWRITE_EXISTING_IDs==False: # skip processing if IDed
continue
patchfullpath=os.path.dirname(image_path)+"/"+ thepatch_list[idx]
patchPIL=Image.open(patchfullpath)
pred, conf, winningdict = get_bioclip_prediction_imgpath(patchPIL,classifier)
# next we can make a copy of the detection json with IDs / or figure out how to ADD the IDs
update_json_labels_and_scores(json_path, idx, pred, conf, winningdict)
#add path to list of patches for later perceptual processing
patch_paths_bots.append(patchfullpath)
json_paths_bots.append(json_path)
idx_paths_bots.append(idx)
def extract_doi_from_csv_path(csv_path: str) -> str:
"""
Extracts the DOI from a filename like:
SpeciesList_..._doi.org10.15468dl.epzeza.csv
and returns the full DOI URL.
Works for any DOI variant formatted as "doi.org10....".
Returns "no_doi" if no valid DOI is found.
"""
filename = os.path.basename(csv_path)
# Try to find the DOI chunk (everything after 'doi.org' up to .csv)
match = re.search(r"(doi\.org[0-9A-Za-z\.\-]+)", filename)
if not match:
return "no_doi"
doi_raw = match.group(1) # e.g. "doi.org10.15468dl.epzeza"
doi_core = doi_raw.replace("doi.org", "") # e.g. "10.15468dl.epzeza"
# General DOI rule: starts with "10." and has a slash somewhere
# Fix by inserting a slash between the prefix and the rest
m = re.match(r"(10\.\d+)(.+)", doi_core)
if not m:
return "no_doi"
prefix, suffix = m.groups()
doi_fixed = f"{prefix}/{suffix}"
return f"https://doi.org/{doi_fixed}"
if __name__ == "__main__":
"""
First the script takes in a INPUT_PATH
Then, (to simplify its searching) it looks through all the folders for folders that are just a single "night"
and follow the date format YYYY-MM-DD for their structure
in each of these folders, it looks to see if there are any .json
"""
args = parse_args()
taxon_filter_num=int(args.rank)
TAXONOMIC_RANK_FILTER = Rank(taxon_filter_num)
OVERWRITE_EXISTING_IDs= bool(int(args.overwrite_prev_bot_ID)) #note arg parser can't do booleans, so you need this workaround!
ID_BOTDETECTIONS=bool(int(args.ID_Bot))
ID_HUMANDETECTIONS=bool(int(args.ID_Hum))
INPUT_PATH= args.input_path
# get species list DOI
DOI=extract_doi_from_csv_path(args.taxa_csv)
print("using species list: "+DOI)
# Check if CUDA is available
if torch.cuda.is_available():
print("CUDA is available!")
print("CUDA version:", torch.version.cuda)
print("Number of GPUs:", torch.cuda.device_count())
print("Current device:", torch.cuda.current_device())
print("GPU Name:", torch.cuda.get_device_name(torch.cuda.current_device()))
DEVICE = torch.device("cuda")
else:
print("CUDA not available, using CPU")
DEVICE = torch.device("cpu")
# Find all the dated folders that our data lives in
print("Looking in this folder for MothboxData: " + INPUT_PATH)
date_folders = find_date_folders(INPUT_PATH)
print(
"Found ",
str(len(date_folders)) + " dated folders potentially full of mothbox data",
)
# Look in each dated folder for .json detection files and the matching .jpgs
hu_matched_img_json_pairs = []
bot_matched_img_json_pairs = []
for folder in date_folders:
hu_list_of_matches, bot_list_of_matches = find_detection_matches(folder)
hu_matched_img_json_pairs = update_main_list(
hu_matched_img_json_pairs, hu_list_of_matches
)
bot_matched_img_json_pairs = update_main_list(
bot_matched_img_json_pairs, bot_list_of_matches
)
print(
"Found ",
str(len(hu_matched_img_json_pairs))
+ " pairs of images and HUMAN detection data to try to ID",
)
# Example Pair
print("example human detection and json pair:")
if(len(hu_matched_img_json_pairs)>0):
print(hu_matched_img_json_pairs[0])
print(
"Found ",
str(len(bot_matched_img_json_pairs))
+ " pairs of images and BOT detection data to try to ID",
)
# Example Pair
print("example human detection and json pair:")
if(len(bot_matched_img_json_pairs)>0):
print(bot_matched_img_json_pairs[0])
# Now that we have our data to be processed in a big list, it's time to load up the Pybioclip stuff
ID_matched_img_json_pairs(
hu_matched_img_json_pairs,
bot_matched_img_json_pairs,
taxon_rank=args.TOLrank,
flag_the_det_errors=args.flag_det_errors,
taxa_path=args.taxa_csv,
taxa_cols=args.taxa_cols,
device=DEVICE,
)
print("Finished Automatic Identification")