Mothbox/AI/Mothbot/Mothbot_CreateDataset.py
2025-09-27 16:04:52 +02:00

1327 lines
44 KiB
Python

#!/usr/bin/env python3
import os
import json
import fiftyone as fo
import fiftyone.utils.image as foui
from hashlib import md5
from pathlib import Path
import numpy as np
from fiftyone.utils.patches import extract_patch
from PIL import Image
import fiftyone.core.labels as fol
import csv
from datetime import datetime
import re
import piexif
#import naturtag
#from naturtag import tag_images
#import exiv2
import subprocess
import threading
import argparse
import sys
import io #put these two lines here so radio can read stuff without breaking
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
import platform
#platform.system() # ' ' 'Linux'
# Import the function from json_to_csv_converter.py
from Mothbot_ConvertDatasettoCSV import json_to_csv
INPUT_PATH = r"C:\Users\andre\Desktop\donald\2022-01-11"
METADATA_PATH = r'..\Mothbox_Main_Metadata_Field_Sheet_Example - Form responses 1.csv'
#UTC_OFFSET= 8 #Panama is -5, Indonesia is 8 change for different locations
UTC_OFFSET= 0
TAXA_LIST_PATH = r"../SpeciesList_CountryIndonesia_TaxaInsecta_doi.org10.15468dl.8p8wua.csv" # downloaded from GBIF for example just insects in panama: https://www.gbif.org/occurrence/taxonomy?country=PA&taxon_key=216
SKIP_EXISTING_THUMBNAIL_PATCHES=True # If false, this will redo the
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_path",
required=False,
default=INPUT_PATH,
help="path to images for classification (ex: datasets/test_images/data)",
)
'''
parser.add_argument(
"--utcoff",
default=UTC_OFFSET,
help="rank to which to classify; must be column in --taxa-csv (default: {UTC_OFFSET})",
)
'''
parser.add_argument(
"--taxa_csv",
default=TAXA_LIST_PATH,
help="CSV with taxonomic labels to use for CustomClassifier (default: {SPECIES_LIST})",
)
parser.add_argument(
"--metadata",
default=METADATA_PATH,
help="Your CSV with metadata about deployments (default: {METADATA_PATH})",
)
return parser.parse_args()
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def find_image_json_pairs(input_dir):
"""Finds pairs of image and JSON files with the same name in a given directory.
Args:
input_dir: The directory to search for files.
Returns:
A list of tuples, where each tuple contains the paths to the image and JSON files.
"""
image_files = [f for f in os.listdir(input_dir) if f.lower().endswith('.jpg') or f.lower().endswith('.png')]
json_files = [f for f in os.listdir(input_dir) if f.lower().endswith('.json')]
pairs = []
for image_file in image_files:
json_file = image_file[:-4] + '.json'
if json_file in json_files:
pairs.append((os.path.join(input_dir, image_file), os.path.join(input_dir, json_file)))
return pairs
def find_detection_matches(folder_path):
"""Finds matching triplets of .jpg, botdetection.json, and potentially a humandetection .json files in a given folder.
Args:
folder_path: The path to the folder to search.
Returns:
two lists of tuples, where each tuple contains the paths to a matching .jpg, botdetection.json,
or matching jpg and humandetection.json file.
"""
# ALL jpg files in the folder
jpg_files = [
os.path.join(folder_path, f)
for f in os.listdir(folder_path)
if f.endswith(".jpg")
]
# List of ALL json files in the folder
json_files = [
os.path.join(folder_path, f)
for f in os.listdir(folder_path)
if f.endswith(".json")
]
hu_detection_matches_list = []
bot_detection_matches_list = []
for jpg_file in jpg_files:
# target human file
humanD_json_file = jpg_file.replace(".jpg", ".json")
botD_json_file = jpg_file.replace(".jpg", "_botdetection.json")
if humanD_json_file in json_files:
hu_detection_matches_list.append((jpg_file,humanD_json_file))
if botD_json_file in json_files:
bot_detection_matches_list.append((jpg_file,botD_json_file))
return hu_detection_matches_list, bot_detection_matches_list
def load_taxon_keys(taxa_path, taxa_cols, taxon_rank="order", flag_det_errors=True):
print("Reading", taxa_path, "extracting", taxon_rank, "values.")
df = pl.read_csv(taxa_path, separator='\t') # Changed separator to '\t' for tab-delimited
target_values = set(
pl.Series(df.select(taxon_rank).drop_nulls())
.str.to_lowercase()
.unique()
.to_list()
)
print("Found", len(target_values), taxon_rank, "values: ")
#print(target_values)
return target_values
def load_taxon_keys_comma(taxa_path, taxa_cols, taxon_rank="order", flag_det_errors=True):
"""
Loads taxon keys from a Comma-delimited CSV file into a list.
Args:
taxa_path: String. Path to the taxa CSV file.
taxa_cols: List of strings. Taxonomic columns in taxa CSV to load (default: ["kingdom", "phylum", "class", "order", "family", "genus", "species"]).
taxon_rank: String. Taxonomic rank to which to classify images (must be present as column in the taxa csv at file_path). Default: "order".
flag_det_errors: Boolean. Whether to flag holes and smudges blanks (adds "hole" and "circle" and "background" and "blank" to taxon_keys). Default: True.
Returns:
taxon_keys: List. A list of taxon keys to feed to the CustomClassifier for bioCLIP classification.
"""
print("Reading", taxa_path, "extracting", taxon_rank, "values.")
df = pl.read_csv(taxa_path, separator='\t')
target_values = set(
pl.Series(df.select(taxon_rank).drop_nulls())
.str.to_lowercase()
.unique()
.to_list()
)
print("Found", len(target_values), taxon_rank, "values: ")
#print(target_values)
return target_values
def _without_first_prefix(name: str) -> str:
"""Return the string with the first underscore-separated prefix removed.
e.g. 'Indonesia_Les_Wilan...' -> 'Les_Wilan...'. If no underscore, returns original.
"""
if not name:
return name
parts = name.split('_', 1)
return parts[1] if len(parts) == 2 else name
def find_csv_match(input_path: str, metadata_path: str) -> dict:
"""
Finds a row in the CSV where 'deployment_name' matches the folder name of input_path.
Tolerates the presence/absence of the first leading prefix on either side.
Matching is case-insensitive.
If multiple matches are found, prints a warning and returns only the first one.
Returns:
dict: The first matching row as a dict, or {} if no match is found.
"""
parent_folder = os.path.basename(os.path.dirname(input_path)).strip()
alt_parent = _without_first_prefix(parent_folder)
# store variants in lowercase for case-insensitive matching
folder_variants = {parent_folder.lower(), alt_parent.lower()}
matches = []
print(f"scanning for metadata matches... (folder variants: {folder_variants})")
with open(metadata_path, mode='r', newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
dep_name = (row.get("deployment_name") or "").strip()
if not dep_name:
continue
alt_dep = _without_first_prefix(dep_name)
dep_variants = {dep_name.lower(), alt_dep.lower()}
# if any variant intersects, it's a match
if folder_variants & dep_variants:
matches.append(row)
if not matches:
print(f"⚠️ No match found for '{parent_folder}' (or '{alt_parent}') in {metadata_path}")
return {}
if len(matches) > 1:
print(f"⚠️ Warning: Multiple matches found for '{parent_folder}', using the first one.")
print(f"✅ Matched deployment_name = '{matches[0].get('deployment_name')}'")
return matches[0]
def load_anylabeling_data(json_path): #TODO load METADATA STRAIGHT FROM CSV - METADATA_PATH - Maybe metadata gets loaded into its own 51 thing via the WHOLe dataset?
"""Loads data from an AnyLabeling JSON file.
Args:
json_path: The path to the JSON file.
Returns:
A dictionary containing the loaded data.
"""
with open(json_path, 'r') as f:
data = json.load(f)
image_height = data['imageHeight']
image_width = data['imageWidth']
creator=""
if(data['version'].startswith("Mothbot")):
detectionBy= data['version']
else:
detectionBy="HumanDetection"
# Extract relevant data from the detection labels
labels = data['shapes']
# Step 2: Initialize an empty dictionary to store metadata
#Skip metadata for now
metadata = {}
return labels, image_height, image_width, metadata, detectionBy
def handle_rotation_annotation(points):
"""Converts an oriented bounding box to a horizontal bounding box.
Args:
points: A list of points representing the vertices of the oriented bounding box.
Returns:
A tuple containing the top, left, width, and height of the horizontal bounding box.
"""
min_x = float('inf')
max_x = -float('inf')
min_y = float('inf')
max_y = -float('inf')
for point in points:
x, y = point
min_x = min(min_x, x)
max_x = max(max_x, x)
min_y = min(min_y, y)
max_y = max(max_y, y)
top = min_y
left = min_x
width = max_x - min_x
height = max_y - min_y
return top, left, width, height
def extract_number(raw_height):
"""
Extracts the numerical value from a string representing height.
Args:
raw_height: The string containing the height information.
Returns:
The numerical value of the height as a float, or None if no numerical value
could be extracted.
"""
# Use regular expression to find the first floating-point or integer number
match = re.search(r"[-+]?\d+\.?\d*|\d+", raw_height)
if match:
return float(match.group(0))
else:
return None
def write_taxonomy_with_exiv2_cli(image_path, taxonomic_list):
"""
Writes iNaturalist-readable taxonomy tags to an image using the exiv2 CLI.
"""
tags = []
for entry in taxonomic_list:
if "_" in entry:
level, value = entry.split("_", 1)
tag = f"taxonomy:{level.lower()}={value}"
# Add tag to both dc:subject and MicrosoftPhoto LastKeywordXMP
tags.append(f"-M\"set Xmp.dc.subject {tag}\"")
tags.append(f"-M\"set Xmp.MicrosoftPhoto.LastKeywordXMP {tag}\"")
# Combine the command
command = ["exiv2"] + tags + [image_path]
try:
result = subprocess.run(" ".join(command), shell=True, capture_output=True, text=True)
if result.returncode == 0:
print(f" Tags written to {image_path}") # cannot do ✅
else:
print(f" exiv2 failed:\n{result.stderr}") #cannot do ❌
except FileNotFoundError:
print(" exiv2 CLI tool not found. Make sure it's installed and in your system PATH.") #cannot do ❌
def add_taxonomy_subject_and_tags_exiv2(image_path, output_path, taxonomic_list):
"""
Adds taxonomy information to a photo's EXIF/XMP data using exiv2,
targeting fields iNaturalist is known to recognize.
Args:
image_path (str): Path to the input image file.
output_path (str): Path where the modified image will be saved.
taxonomic_list (list): A list of taxonomy strings like "Kingdom_Animalia".
"""
# Step 1: Build semicolon-separated taxonomy string for XMP/IPTC Keywords/Subject
subject_keywords = []
for item in taxonomic_list:
if "_" in item:
level, value = item.split("_", 1)
# iNaturalist often just looks for the raw scientific name in keywords,
# but including the "taxonomy:level=" format is good for general use.
# Let's add both for good measure, or just the value.
# For iNaturalist, a simple list of names (e.g., "Homo sapiens; Chordata")
# in Subject/Keywords is often sufficient.
subject_keywords.append(value.replace("_", " ")) # Space for readability
subject_keywords.append(f"taxonomy:{level.lower()}={value.replace('_', ' ')}")
# Make sure we have unique values and join them
unique_subject_keywords = sorted(list(set(subject_keywords)))
keywords_str = ";".join(unique_subject_keywords)
# For the XMP dc:title, iNaturalist often picks the most specific taxon
# or the full scientific name (Genus species) if available.
# Let's try to get the species if present, otherwise the lowest rank.
dc_title = ""
species_found = False
for item in reversed(taxonomic_list): # Iterate in reverse to get most specific
if "Species_" in item:
dc_title = item.split("Species_", 1)[1].replace("_", " ")
species_found = True
break
elif "_" in item and not dc_title: # If no species, take the lowest rank
dc_title = item.split("_", 1)[1].replace("_", " ")
if not dc_title and taxonomic_list: # Fallback if no specific rank found
dc_title = taxonomic_list[-1].split("_", 1)[1].replace("_", " ")
# Step 2: Open image with exiv2
try:
image = exiv2.ImageFactory.open(image_path)
image.readMetadata()
# Get existing metadata if any
exif_data = image.exifData()
iptc_data = image.iptcData()
xmp_data = image.xmpData()
# Step 3: Set EXIF/XMP/IPTC tags
# iNaturalist specifically mentions looking at Subject tags (which map to XMP/IPTC Keywords)
# and dc:title (XMP Title). UserComment is also checked.
# XMP Subject (often used for keywords/tags)
# exiv2 treats XMP arrays as lists directly
# First, clear existing Subject array if we're replacing
if 'Xmp.dc.subject' in xmp_data:
del xmp_data['Xmp.dc.subject']
# Add new subjects (keywords)
for keyword in unique_subject_keywords:
xmp_data.add('Xmp.dc.subject', keyword)
print(f"Set Xmp.dc.subject: {unique_subject_keywords}")
# IPTC Keywords (also often used for tags, syncs with XMP Subject)
# IPTC keywords are typically multi-string.
# Clear existing keywords if we're replacing
if 'Iptc.Application2.Keywords' in iptc_data:
del iptc_data['Iptc.Application2.Keywords']
for keyword in unique_subject_keywords:
iptc_data.add('Iptc.Application2.Keywords', keyword)
print(f"Set Iptc.Application2.Keywords: {unique_subject_keywords}")
# XMP dc:title (Title of the image, iNaturalist can parse this for taxon)
if dc_title:
xmp_data['Xmp.dc.title'] = dc_title
print(f"Set Xmp.dc.title: {dc_title}")
# UserComment (EXIF, more general notes)
# exiv2 handles encoding for UserComment. Just provide the string.
# The ASCII\x00\x00\x00 prefix is often managed by the library.
exif_data['Exif.Photo.UserComment'] = taxonomy_str
print(f"Set Exif.Photo.UserComment: {taxonomy_str}")
# --- Ensure common EXIF tags are present (optional, but good practice) ---
# iNaturalist often checks for DateTimeOriginal for observation date.
# If it's missing, add current timestamp.
if 'Exif.Photo.DateTimeOriginal' not in exif_data:
now = datetime.datetime.now().strftime("%Y:%m:%d %H:%M:%S")
exif_data['Exif.Photo.DateTimeOriginal'] = now
print(f"Added Exif.Photo.DateTimeOriginal: {now}")
# Ensure Exif.Image.DateTime (also a creation date field)
if 'Exif.Image.DateTime' not in exif_data:
now = datetime.datetime.now().strftime("%Y:%m:%d %H:%M:%S")
exif_data['Exif.Image.DateTime'] = now
print(f"Added Exif.Image.DateTime: {now}")
# Step 4: Write metadata back to the image
image.setExifData(exif_data)
image.setIptcData(iptc_data)
image.setXmpData(xmp_data)
image.writeMetadata(output_path)
print(f"Metadata written to: {output_path}")
except exiv2.Exiv2Error as e:
print(f"Exiv2 error: {e}")
except FileNotFoundError:
print(f"Error: Image not found at {image_path}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
def add_taxonomy_subject_and_tags(image_path, output_path, taxonomic_list):
# Step 1: Build semicolon-separated taxonomy string
exif_subject = []
for item in taxonomic_list:
if "_" in item:
level, value = item.split("_", 1)
tag = f"taxonomy:{level.lower()}={value}"
exif_subject.append(tag)
taxonomy_str = ";".join(exif_subject) # For all EXIF fields
# taxonomy_str=taxonomy_str+";nothing"
# Step 2: Load image and EXIF
img = Image.open(image_path)
exif_bytes = img.info.get("exif")
if exif_bytes:
exif_dict = piexif.load(exif_bytes)
else:
exif_dict = {"0th": {}, "Exif": {}, "GPS": {}, "1st": {}, "thumbnail": None}
# Step 3: Encode XP fields in UTF-16LE with null terminator
def encode_xp(value):
return value.encode("utf-16le") + b'\x00\x00'
encoded_taxonomy = encode_xp(taxonomy_str)
# Step 4: Write to XPSubject and XPKeywords
exif_dict["0th"][piexif.ImageIFD.XPSubject] = encoded_taxonomy
exif_dict["0th"][piexif.ImageIFD.XPKeywords] = encoded_taxonomy
# Optional: still set UserComment (as plain UTF-8)
user_comment_bytes = b"ASCII\x00\x00\x00" + taxonomy_str.encode("utf-8")
exif_dict["Exif"][piexif.ExifIFD.UserComment] = user_comment_bytes
# Step 5: Save the updated image
exif_bytes = piexif.dump(exif_dict)
img.save(output_path, exif=exif_bytes)
print(f"Saved taxonomy (semicolon-separated) to Subject and Tags: {output_path}")
def write_taxonomy_with_naturtag_old(image_path, taxonomic_list, include_common_names=False):
"""
Writes taxonomy tags into image EXIF/XMP metadata using naturtag.
- image_path: path to the JPEG to tag
- taxonomic_list: list like ['KINGDOM_Animalia', 'PHYLUM_Arthropoda', ...]
- include_common_names: whether to add any common-name tags (usually False)
"""
# Build structured taxonomy keywords
keywords = []
for entry in taxonomic_list:
if "_" in entry:
level, val = entry.split("_", 1)
keywords.append(f"taxonomy:{level.lower()}={val}")
# Call tag_images correctly, passing keywords to the proper parameter
tag_images(
image_path,
keywords=keywords, # custom taxonomy tags
common_names=include_common_names,
create_xmp=True # ensures XMP metadata is embedded
)
print(f" Taxonomy tags written using naturtag to: {image_path}")
def write_taxonomy_with_naturtag(image_path, taxonomic_list, include_common_names=False):
# Construct taxonomy keywords
keywords = []
for entry in taxonomic_list:
if "_" in entry:
level, val = entry.split("_", 1)
keywords.append(f"taxonomy:{level.lower()}={val}")
# Wrap call to catch warnings or recover
try:
tag_images(
image_path,
keywords=keywords,
common_names=include_common_names,
create_xmp=True # ensures proper XMP embedding
)
print(f" Successfully tagged: {image_path}")
except Exception as e:
print(f" naturtag failed with error:\n{e}")
print("➡️ Attempting fallback: writing only EXIF tags via piexif")
# Fallback: write EXIF XP fields only
import piexif
from PIL import Image
def encode_xp(val):
return val.encode("utf-16le") + b'\x00\x00'
taxonomy_str = ";".join(keywords)
img = Image.open(image_path)
exif_bytes = img.info.get("exif")
exif_dict = piexif.load(exif_bytes) if exif_bytes else {"0th": {}, "Exif": {}, "GPS": {}, "1st": {}, "thumbnail": None}
exif_dict["0th"][piexif.ImageIFD.XPSubject] = encode_xp(taxonomy_str)
exif_dict["0th"][piexif.ImageIFD.XPKeywords] = encode_xp(taxonomy_str)
img.save(image_path, exif=piexif.dump(exif_dict))
print("Fallback EXIF tags written")
class ExifToolSession:
exifPath="exiftool" #mac or linux
if(platform.system()=='Windows'):
exifPath="../exiftool-13.32_64/exiftool"
def __init__(self, exiftool_path=exifPath):
self.process = subprocess.Popen(
[exiftool_path, "-stay_open", "True", "-@", "-"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1 # line-buffered
)
# Start a background thread to drain stderr
self.stderr_output = []
self.stderr_thread = threading.Thread(target=self._drain_stderr, daemon=True)
self.stderr_thread.start()
def _drain_stderr(self):
for line in self.process.stderr:
self.stderr_output.append(line.strip())
def add_taxonomy_with_exiftool(self, full_patch_path, taxonomic_list):
args = []
# 1. Format taxonomy tags
for entry in taxonomic_list:
if "_" in entry:
level, value = entry.split("_", 1)
tag = f"taxonomy:{level.lower()}={value}"
args.append(f"-XMP-dc:Subject+={tag}")
args.append(f"-XMP-MicrosoftPhoto:LastKeywordXMP+={tag}")
# 2. Extract datetime from filename
filename = Path(full_patch_path).name
match = re.search(r"_(\d{4})_(\d{2})_(\d{2})__?(\d{2})_(\d{2})_(\d{2})", filename)
if match:
y, m, d, h, mi, s = match.groups()
datetime_str = f"{y}:{m}:{d} {h}:{mi}:{s}"
args.append(f"-DateTimeOriginal={datetime_str}")
args.append(f"-CreateDate={datetime_str}")
args.append(f"-ModifyDate={datetime_str}")
else:
print(f"⚠️ No datetime found in filename: {filename}")
# 3. Finalize arguments
args.extend([
"-overwrite_original",
"-fast2",
str(full_patch_path),
"-execute\n"
])
# 4. Send to ExifTool
self.process.stdin.write("\n".join(args))
self.process.stdin.flush()
# 5. Drain stdout
output_lines = []
while True:
line = self.process.stdout.readline()
if not line:
break
if line.strip() == "{ready}":
break
output_lines.append(line.strip())
if output_lines:
#print(f" ExifTool output for {full_patch_path}:")
for line in output_lines:
None
#print(" ", line)
def close(self):
self.process.stdin.write("-stay_open\nFalse\n")
self.process.stdin.flush()
self.process.wait()
def create_sample(image_path, labels, image_height, image_width, metadata, detection_creator):# skipping exif tagger for now, tagger):
"""Creates a FiftyOne sample using the 51 python interface
Args:
image_path: The path to the image file.
labels: A list of labels from the AnyLabeling JSON file.
image_height: The height of the image.
image_width: The width of the image.
Returns:
A FiftyOne JSON sample.
"""
sample = fo.Sample(
filepath= image_path,
)
sample["uploaded"]=metadata.get("uploaded","")
sample["sd"]=metadata.get("sd_card","")
sample["device"]=metadata.get("device","")
sample["firmware"]=str(metadata.get("firmware",""))
sample["sheet"]=metadata.get("sheet","")
sample["datasetcollection"]=metadata.get("dataset","") #call it a dataset collection here to not confuse with the fiftyonedataset
sample["project"]=metadata.get("project","")
sample["site"]=metadata.get("site","")
latitude=metadata.get("latitude","0.00000")
longitude=metadata.get("longitude","0.00000")
geolocation = fo.GeoLocation(latitude=latitude, longitude=longitude)
sample["location"]=geolocation
sample["longitude"]=longitude
sample["latitude"]=latitude
therawgroundheight=metadata.get("height_above_ground","-1")
sample["ground_height"]= extract_number(therawgroundheight)
sample["deployment_name"]=metadata.get("deployment_name","")
sample["deployment_date"]=metadata.get("deployment_date","")
sample["collect_date"]=metadata.get("collect_date","")
sample["data_storage_location"]=metadata.get("data_storage_location","")
sample["crew"]=metadata.get("crew","")
sample["notes"]=metadata.get("notes","")
sample["schedule"]=metadata.get("schedule","")
sample["habitat"]=metadata.get("habitat","")
sample["attractor"]=metadata.get("attractor","")
sample["attractor_location"]=metadata.get("attractor_location","")
sample["UTC"]=UTC_OFFSET
sample["detection_By"]=detection_creator
detections_list=[]
for label in labels:
direction = label['direction']
label_name = label['label']
theclusterID= label['clusterID']
l_timestamp_cluster= label['timestamp_cluster']
l_confidence_ID = label['confidence_ID']
l_timestamp_ID_bot = label['timestamp_ID_bot']
l_confidence_detection = label['confidence_detection']
l_identifier_bot = label['identifier_bot']
l_identifier_human = label['identifier_human']
l_timestamp_detection = label['timestamp_detection']
l_detector_bot = label['detector_bot']
l_species_list = label['species_list']
score = label['score']
points = label['points']
shape_type = label['shape_type']
ID_by = "IDby_"+label['description']
the_patch_path= label['patch_path']
desired_keys = ['kingdom', 'phylum', 'class', 'order', 'family', 'genus', 'species']
# Filter the dictionary to include only desired keys
filtered_dict = {key: value for key, value in label.items() if key in desired_keys}
# Check for unwanted values
unwanted_values = {"error", "ERROR", "Error"}
if any(any(u.lower() in value.lower() for u in unwanted_values) for value in filtered_dict.values()):
taxonomic_list = ["Error"]
else:
# Format the filtered dictionary
taxonomic_list = [f"{key.upper()}_{value}" for key, value in filtered_dict.items()]
full_patch_path=Path(INPUT_PATH+"/"+the_patch_path) #should work on mac or windows
# skipping exif tagger
#print("adding taxonomy with Exiftool...(can take a couple seconds)")
#tagger.add_taxonomy_with_exiftool(str(full_patch_path), taxonomic_list)
taxonomic_list.append(ID_by)
if shape_type == 'rotation': #Todo - these should be handled as a polygon in 51, because they only have regular rects they call "Detections" but we have rotated rects (that should be stored as polylines via polyline.fromrotatedboundingbox https://docs.voxel51.com/user_guide/using_datasets.html#rotated-bounding-boxes)
top, left, width, height = handle_rotation_annotation(points)
#print( top, left, width, height)
#print("The script will pause now. Press Enter to continue.")
#input()
# Normalize bounding box coordinates
top /= image_height
left /= image_width
width /= image_width
height /= image_height
# Create a FiftyOne detection
detection=fol.Detection(
tags=taxonomic_list,
label="creature",
bounding_box=[left, top, width, height],
confidence=score,
shape=shape_type,
rot_direction=direction,
patch_path=the_patch_path,
clusterID=theclusterID,
timestamp_cluster = l_timestamp_cluster,
confidence_ID = l_confidence_ID,
timestamp_ID_bot = l_timestamp_ID_bot,
confidence_detection = l_confidence_detection,
identifier_bot = l_identifier_bot,
identifier_human = l_identifier_human,
timestamp_detection = l_timestamp_detection,
detector_bot = l_detector_bot,
species_list = l_species_list,
)
detections_list.append(detection)
elif shape_type == 'polygon':
# Handle polygon annotations (adjust as needed)
None
#print("num detections")
#print(len(detections_list))
sample["creature_detections"] = fol.Detections(detections=detections_list) #TODO - give this an appropriate name
return sample
def generate_patch_dataset(dataset, output_dir=INPUT_PATH+"/patches", target_size=(1024, -1)):
"""
Generates thumbnails for images in a FiftyOne dataset, skipping existing ones.
Args:
dataset: The FiftyOne dataset.
output_dir: The directory to save the thumbnails.
target_size: The target size for the thumbnails (width, height).
Returns:
None
"""
patch_folder_path=Path(INPUT_PATH+"/patches")
patch_folder_path.mkdir(parents=True, exist_ok=True)
samples_to_process = []
patch_samples = []
for sample in dataset.iter_samples(progress=True):
filename = os.path.basename(sample.filepath) #this is just the basename that it stores!
sample_fullpath=INPUT_PATH+"/"+filename
print(sample.filename)
#print(sample)
detections= sample.creature_detections.detections
detector=sample.detection_By
detnum=0
for detection in detections:
patchfullpath=INPUT_PATH+"/"+detection.patch_path
inferred_patchfilename=filename.split('.')[0] + "_" + str(detnum) +"_"+detector+ "." +filename.split('.')[1]
inferred_patchfullpath = Path(patch_folder_path) / f'{inferred_patchfilename}'
#export_image(patch, patch_path,filename, detnum)
# Extract coordinates
xmin, ymin, xmax, ymax = detection.bounding_box
# Calculate width and height
p_width = xmax - xmin
p_height = ymax - ymin
patch_sample = fo.Sample(
#filepath_fullimage= sample.filepath,
filepath_fullimage=sample_fullpath,
filepath = str(patchfullpath),
tags= detection.tags,
label="detection",
location=sample.location,
longitude=sample.longitude,
latitude=sample.latitude,
bounding_box=detection.bounding_box,
patch_width=p_width,
patch_height=p_height,
#attributes={},
#ID_by=detection.ID_by,
confidence=detection.confidence,
clusterID=detection.clusterID,
timestamp_cluster = detection.timestamp_cluster,
confidence_ID = detection.confidence_ID,
timestamp_ID_bot = detection.timestamp_ID_bot,
confidence_detection = detection.confidence_detection,
identifier_bot = detection.identifier_bot,
identifier_human = detection.identifier_human,
timestamp_detection = detection.timestamp_detection,
detector_bot = detection.detector_bot,
species_list = detection.species_list,
shape=detection.shape,
direction=detection.rot_direction,
#direction = sample.direction,
#label_name = label['label']
uploaded=sample.uploaded,
device=sample.device,
sd=sample.sd, #Dots might be bad in key name
firmware=sample.firmware,
sheet=sample.sheet,
datasetcollection=sample.datasetcollection,
project=sample.project,
site=sample.site,
ground_height=sample.ground_height,
deployment_name = sample.deployment_name,
deployment_date = sample.deployment_date,
collect_date = sample.collect_date,
data_storage_location = sample.data_storage_location,
crew=sample.crew,
notes=sample.notes,
schedule=sample.schedule,
habitat=sample.habitat,
attractor=sample.attractor,
attractor_location=sample.attractor_location,
UTC=sample.UTC,
detection_By=sample.detection_By
)
# Disabling for now
#add GPS info to the thumbnail patch
#print("adding GPS to "+patchfullpath)
#add_gps_exif(patchfullpath, patchfullpath,float(sample.latitude), float(sample.longitude))
patch_samples.append(patch_sample)
detnum=detnum+1
#sample.save()
patch_ds = fo.Dataset()
patch_ds.add_samples(patch_samples)
patch_ds.app_config['media_fields'] = ['filepath', 'filepath_fullimage']
patch_ds.app_config['grid_media_field'] = 'filepath'
patch_ds.app_config['modal_media_field'] = 'filepath'
patch_ds.save()
dataset.save()
return patch_ds
def deg_to_dms_rational(deg_float):
"""Convert decimal degrees to degrees, minutes, seconds in rational format"""
deg = int(deg_float)
min_float = abs(deg_float - deg) * 60
minute = int(min_float)
sec_float = (min_float - minute) * 60
sec = int(sec_float * 10000)
return ((abs(deg), 1), (minute, 1), (sec, 10000))
def add_gps_exif(input_path, output_path, lat, lng, altitude=None):
# Load image
img = Image.open(input_path)
# Try to load existing EXIF data, or start fresh
exif_bytes = img.info.get("exif")
if exif_bytes:
exif_dict = piexif.load(exif_bytes)
else:
exif_dict = {"0th": {}, "Exif": {}, "GPS": {}, "1st": {}, "thumbnail": None}
# Check if GPS data already exists
gps_existing = exif_dict.get("GPS", {})
if gps_existing.get(piexif.GPSIFD.GPSLatitude) and gps_existing.get(piexif.GPSIFD.GPSLongitude):
print("GPS data already exists. No changes made.")
return
# Create GPS IFD
gps_ifd = {
piexif.GPSIFD.GPSLatitudeRef: 'N' if lat >= 0 else 'S',
piexif.GPSIFD.GPSLatitude: deg_to_dms_rational(lat),
piexif.GPSIFD.GPSLongitudeRef: 'E' if lng >= 0 else 'W',
piexif.GPSIFD.GPSLongitude: deg_to_dms_rational(lng),
}
if altitude is not None:
gps_ifd[piexif.GPSIFD.GPSAltitudeRef] = 0 if altitude >= 0 else 1
gps_ifd[piexif.GPSIFD.GPSAltitude] = (int(abs(altitude * 100)), 100)
# Inject GPS into EXIF
exif_dict['GPS'] = gps_ifd
exif_bytes = piexif.dump(exif_dict)
# Save the image with new EXIF
img.save(output_path, exif=exif_bytes)
print(f"Saved image with GPS data: {output_path}")
def generate_patch_thumbnails_orig(dataset, output_dir=INPUT_PATH+"/patches", target_size=(1024, -1)):
"""
Generates thumbnails for images in a FiftyOne dataset, skipping existing ones.
Args:
dataset: The FiftyOne dataset.
output_dir: The directory to save the thumbnails.
target_size: The target size for the thumbnails (width, height).
Returns:
None
"""
patch_folder_path=Path(INPUT_PATH+"/patches")
patch_folder_path.mkdir(parents=True, exist_ok=True)
samples_to_process = []
patch_samples = []
for sample in dataset.iter_samples(progress=True):
filename = os.path.basename(sample.filepath) #this is just the basename that it stores!
sample_fullpath=INPUT_PATH+"/"+filename
#thumbnail_path = f"{output_dir}/{filename}"
#sample["thumbnail_path"]=thumbnail_path
patch_path = INPUT_PATH+"/patches"
print(sample.filename)
#print(sample)
detections= sample.creature_detections.detections
detector=sample.detection_By
#detector=detector.replace('.pt','')
#print(detections)
#print(len(detections))
#print(dataset.get_field_schema())
#input("Press Enter to continue...")
detnum=0
for detection in detections:
patchfilename=filename.split('.')[0] + "_" + str(detnum) +"_"+detector+ "." +filename.split('.')[1]
patchfullpath = Path(patch_folder_path) / f'{patchfilename}'
#export_image(patch, patch_path,filename, detnum)
if not os.path.exists(patchfullpath) and SKIP_EXISTING_THUMBNAIL_PATCHES==False: #skip thumbs already generated unless we specifically target to overwrite them each time
# Load the image using PIL and convert it to a NumPy array
img = Image.open(sample_fullpath)
# Convert the image to a NumPy array for patch extraction
img_array = np.array(img)
# Extract the patch using your custom function
patch_array = extract_patch(img_array, detection=detection)
# Convert the extracted patch back to a PIL Image
patch_image = Image.fromarray(patch_array)
# Save the thumbnail
patch_image.save(patchfullpath)
# Extract coordinates
xmin, ymin, xmax, ymax = detection.bounding_box
# Calculate width and height
p_width = xmax - xmin
p_height = ymax - ymin
patch_sample = fo.Sample(
#filepath_fullimage= sample.filepath,
filepath_fullimage=sample_fullpath,
filepath = str(patchfullpath),
tags= detection.tags,
label="detection",
location=sample.location,
longitude=sample.longitude,
latitude=sample.latitude,
bounding_box=detection.bounding_box,
patch_width=p_width,
patch_height=p_height,
#attributes={},
#ID_by=detection.ID_by,
confidence=detection.confidence,
clusterID=detection.clusterID,
timestamp_cluster = detection.timestamp_cluster,
confidence_ID = detection.confidence_ID,
timestamp_ID_bot = detection.timestamp_ID_bot,
confidence_detection = detection.confidence_detection,
identifier_bot = detection.identifier_bot,
identifier_human = detection.identifier_human,
timestamp_detection = detection.timestamp_detection,
detector_bot = detection.detector_bot,
species_list = detection.species_list,
shape=detection.shape,
direction=detection.direction,
#direction = sample.direction,
#label_name = label['label']
uploaded=sample.uploaded,
device=sample.device,
sd=sample.sd, #Dots might be bad in key name
firmware=sample.firmware,
sheet=sample.sheet,
datasetcollection=sample.datasetcollection,
project=sample.project,
site=sample.site,
ground_height=sample.ground_height,
deployment_name = sample.deployment_name,
deployment_date = sample.deployment_date,
collect_date = sample.collect_date,
data_storage_location = sample.data_storage_location,
crew=sample.crew,
notes=sample.notes,
schedule=sample.schedule,
habitat=sample.habitat,
attractor=sample.attractor,
attractor_location=sample.attractor_location,
UTC=sample.UTC,
detection_By=sample.detection_By
)
patch_samples.append(patch_sample)
detnum=detnum+1
#sample.save()
patch_ds = fo.Dataset()
patch_ds.add_samples(patch_samples)
patch_ds.app_config['media_fields'] = ['filepath', 'filepath_fullimage']
patch_ds.app_config['grid_media_field'] = 'filepath'
patch_ds.app_config['modal_media_field'] = 'filepath'
patch_ds.save()
dataset.save()
return patch_ds
if __name__ == "__main__":
### START
args = parse_args()
INPUT_PATH=args.input_path
METADATA_PATH=args.metadata
TAXA_LIST_PATH=args.taxa_csv
#UTC_OFFSET=int(args.utcoff)
#pairs = find_image_json_pairs(INPUT_PATH)
hu_pairs, bot_pairs = find_detection_matches(INPUT_PATH)
samples=[]
# Iterate through human pairs and load data
metadata= find_csv_match(INPUT_PATH, METADATA_PATH)
UTC_OFFSET = float(metadata.get("UTC",0))
#skipping exif stuff for now, will break out to separate step
#tagger = ExifToolSession()
for image_path, json_path in hu_pairs:
full_image_path = image_path
labels, image_height, image_width, notmetadata, detection_creator = load_anylabeling_data(json_path)
sample = create_sample(full_image_path, labels, image_height, image_width, metadata, detection_creator)#, tagger)
samples.append(sample)
for image_path, json_path in bot_pairs:
full_image_path = image_path
labels, image_height, image_width, notmetadata, detection_creator = load_anylabeling_data(json_path)
sample = create_sample(full_image_path, labels, image_height, image_width, metadata, detection_creator)#, tagger )
samples.append(sample)
#tagger.close()
# Create dataset
dataset = fo.Dataset()
dataset.add_samples(samples)
# Generate some thumbnail images
thepatch_dataset = generate_patch_dataset(dataset)
# Customize the sidebar configuration
# Get the default sidebar groups for the dataset
sidebar_groups = fo.DatasetAppConfig.default_sidebar_groups(thepatch_dataset)
# Collapse the `tags`, `metadata`, and `primitives` sections by default
sidebar_groups[0].expanded = True # tags
sidebar_groups[1].expanded = False # metadata
sidebar_groups[2].expanded = False # labels
sidebar_groups[3].expanded = False # primitives
active_fields = fo.DatasetAppConfig.default_active_fields(thepatch_dataset)
active_fields.paths.extend(["clusterID"])
#active_fields.paths.extend(["sample tags"])
thepatch_dataset.app_config.active_fields = active_fields
# Create a custom color scheme
color_scheme=fo.ColorScheme(
color_pool=["#771e1e", "#289128","#185C5C", "#1f1fd3","#d31f73" ,"#AF621A", "#1a6ac5","#27a560", "#cc00ff","#a5c70f", "#ff7b00",],
fields=[
{
"path": "ground_truth",
"colorByAttribute": "eval",
"valueColors": [
# false negatives: blue
{"value": "fn", "color": "#0000ff"},
# true positives: green
{"value": "tp", "color": "#00ff00"},
]
},
{
"path": "predictions",
"colorByAttribute": "eval",
"valueColors": [
# false positives: red
{"value": "fp", "color": "#ff0000"},
# true positives: green
{"value": "tp", "color": "#00ff00"},
]
},
{
"path": "segmentations",
"maskTargetsColors": [
# 12: red
{"intTarget": 12, "color": "#ff0000"},
# 15: green
{"intTarget": 15, "color": "#00ff00"},
]
}
],
color_by="value",
opacity=0.5,
default_colorscale= {"name": "rdbu", "list": None},
colorscales=[
{
# field definition overrides the default_colorscale
"path": "clusterID",
# if name is defined, it will override the list
"name": None,
"list": [
{"value": -1.0, "color": "rgb(0,255,255)"},
{"value": 15, "color": "rgb(255,0,0)"},
{"value": 60.0, "color": "rgb(0,0,255)"},
],
}
],
)
# Apply the sidebar groups configuration to the app config
thepatch_dataset.app_config.sidebar_groups = sidebar_groups
# Save the updated app config
thepatch_dataset.compute_metadata() # can kill its progress bar like this: progress=False)
thepatch_dataset.save()
# Export the dataset without saving the image data (It gets saved as "sample.json" and "metadata.json" in the inputpath folder)
thepatch_dataset.export(
export_dir=INPUT_PATH,
dataset_type=fo.types.FiftyOneDataset,
export_media=False, # This ensures only labels and metadata are saved
#progress=False
)
# Let's automatically generate the CSV now too, just to be nice
json_to_csv(INPUT_PATH, TAXA_LIST_PATH)
print(thepatch_dataset)
# Sort the dataset by patch_width in ascending order
sorted_dataset = thepatch_dataset.sort_by("clusterID",True)
# Launch the FiftyOne App with the sorted view
session = fo.launch_app(sorted_dataset, color_scheme=color_scheme)
print(f"{bcolors.OKGREEN}The app is running, open your browser to use{bcolors.ENDC}")
print(f"{bcolors.WARNING} or press CTRL+C to kill app{bcolors.ENDC}")
session.wait(-1)