Mothbox/AI/Mothbot/Mothbot_ConvertDatasettoCSV.py
2025-09-23 00:51:08 -05:00

395 lines
16 KiB
Python

#!/usr/bin/env python3
import json
import os
import datetime
import csv
import re
from datetime import datetime, timedelta
#from unidecode import unidecode
#from pygbif import species
import unicodedata
import pandas as pd
import argparse
INPUT_PATH = r"C:\Users\andre\Desktop\MB_Test_Zone\Indonesia_Les_WilanTopTree_HopeCobo_2025-06-25\2025-06-26"
#UTC_OFFSET=-5 #panama is -5 indonesia is 8
# Specify the path to your taxonomy CSV file
TAXA_LIST_PATH = r"../SpeciesList_CountryIndonesia_TaxaInsecta.csv"
TAXA = ['kingdom', 'phylum', 'class', 'order', 'family', 'genus', 'species']
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_path",
required=False,
default=INPUT_PATH,
help="path to images for classification (ex: datasets/test_images/data)",
)
'''
parser.add_argument(
"--utcoff",
default=UTC_OFFSET,
help="rank to which to classify; must be column in --taxa-csv (default: {UTC_OFFSET})",
)
'''
parser.add_argument(
"--taxa_csv",
default=TAXA_LIST_PATH,
help="CSV with taxonomic labels to use for CustomClassifier (default: {SPECIES_LIST})",
)
return parser.parse_args()
def OLDcreate_occurrence_id(filename, latitude, longitude):
# Step 1: Process filename
# Remove spaces, convert to lowercase, replace irregular characters, convert "_" to "-"
filename = unidecode(filename).lower().strip()
# Remove .jpg extension and extract relevant part of the filename
filename = filename.replace(".jpg", "")
# Extract part before the 9th underscore
parts = filename.split('_')
if len(parts) >= 9:
filename_part = '_'.join(parts[:8])
else:
filename_part = '_'.join(parts)
filename_part = filename_part.replace("_", "-")
# Step 2: Extract the number right before .jpg
final_number = parts[-1] if parts[-1].isdigit() else "0"
# Step 3: Process latitude
# Remove non-digit characters (like "." and "-"), then take the first 5 characters
cleaned_lat = re.sub(r'[^0-9]', '', latitude) # Remove ".", "-"
lat = cleaned_lat[:5] # Take only the first 5 digits
# Step 4: Process longitude
# Remove non-digit characters (like "." and "-"), then take the first 5 characters
cleaned_lon = re.sub(r'[^0-9]', '', longitude) # Remove ".", "-"
lon = cleaned_lon[:5] # Take only the first 5 digits
# Step 5: Combine into occurrenceID
occurrence_id = f"{filename_part}-{lat}-{lon}-{final_number}"
return occurrence_id
def adjust_timestamp_with_utc_offset(date_str, time_str, utc_offset):
# Step 1: Parse the date and timestamp
original_datetime = datetime.strptime(f"{date_str} {time_str}", "%Y_%m_%d %H_%M_%S")
# Step 2: Apply the UTC offset (assuming utc_offset is in hours, e.g., +2, -5)
adjusted_datetime = original_datetime + timedelta(hours=utc_offset)
# Step 3: Format the adjusted datetime back into strings
adjusted_date = adjusted_datetime.strftime("%Y_%m_%d")
adjusted_time = adjusted_datetime.strftime("%H_%M_%S")
return adjusted_date, adjusted_time
def format_datetime_with_utc_offset(date_str, time_str, utc_offset):
# Step 1: Parse the adjusted date and time
original_datetime = datetime.strptime(f"{date_str} {time_str}", "%Y_%m_%d %H_%M_%S")
# Step 2: Apply the UTC offset to set to UTC time (assuming utc_offset is in hours)
adjusted_datetime = original_datetime - timedelta(hours=utc_offset)
# Step 3: Determine the sign and format the UTC offset
offset_sign = "+" if utc_offset >= 0 else "-"
abs_offset = abs(utc_offset)
offset_hours = int(abs_offset)
offset_minutes = int((abs_offset - offset_hours) * 60)
formatted_offset = f"{offset_sign}{offset_hours:02d}{offset_minutes:02d}"
# Step 4: Format the final datetime string in the desired format
formatted_datetime = adjusted_datetime.strftime("%Y-%m-%dT%H:%M:%S")
final_output = f"{formatted_datetime}{formatted_offset}"
return final_output
def get_deepest_classification(kingdom="", phylum="",class_="", order="", family="", genus="",species=""):
for rank in reversed(TAXA):
# Handle the special case where 'class' is stored as 'class_'
var_name = rank if rank != "class" else "class_"
value = locals().get(var_name, "")
if value:
return rank, value
return None, None
# Function to search GBIF for taxonomic details
#This works great but can be VERY slow
def get_gbif_info_online(rank, name):
try:
# Search GBIF by name
search_results = species.name_lookup(q=name, rank=rank, limit=1)
if search_results['results']:
result = search_results['results'][0]
taxon_id = result.get('key', None)
scientific_name = result.get('scientificName', None)
common_name = result.get('vernacularName', None)
# Normalize names to ASCII to remove any special characters
if scientific_name:
scientific_name = unicodedata.normalize('NFKD', scientific_name).encode('ascii', 'ignore').decode('utf-8')
if common_name:
common_name = unicodedata.normalize('NFKD', common_name).encode('ascii', 'ignore').decode('utf-8')
return taxon_id, scientific_name, common_name
else:
print(f"No GBIF results found for {rank}: {name}")
return None, None, None
except Exception as e:
print(f"Error fetching GBIF data: {e}")
return None, None, None
# Function to load the taxa lookup CSV into a pandas DataFrame
def load_taxa_lookup(taxa_list_path):
# Load the CSV into a pandas DataFrame
taxa_lookup_df = pd.read_csv(taxa_list_path, sep='\t')
return taxa_lookup_df
def find_deepest_taxon_info(taxa_list, taxa_values, taxa_lookup):
# Step 1: Build a reversed list of available taxonomic ranks and values (deepest first)
valid_ranks = [(rank, value) for rank, value in zip(taxa_list, taxa_values) if value not in ("", None)]
valid_ranks.reverse() # Start from the most specific (species) and go up to kingdom
# Step 2: Attempt to find a match starting from the deepest rank
for rank, value in valid_ranks:
# Filter rows where 'taxonRank' matches this rank (case-insensitive)
filtered_rows = taxa_lookup[taxa_lookup['taxonRank'].str.lower() == rank.lower()]
# Ensure the column exists in the dataframe
if rank not in filtered_rows.columns:
continue
# Find the row where this taxon value matches (case-insensitive)
matched_row = filtered_rows[filtered_rows[rank].str.lower() == value.lower()]
# Step 3: If a match is found, return taxonKey and scientificName
if not matched_row.empty:
taxon_id = matched_row['taxonKey'].values[0]
scientific_name = matched_row['scientificName'].values[0] #GBIF Species lists don't apparently have a common name
return taxon_id, scientific_name
# Step 4: If no match found at any level, return None
print(str(taxa_values) + " No match found at any rank level.")
return None, None
def create_uniquedatasetID(deployment_name, oid_dict):
"""
Creates a deployment name by concatenating the given deployment_name
with the 'oid' value extracted from the provided dictionary.
Args:
deployment_name: The base name for the deployment.
oid_dict: A dictionary containing the 'oid' key.
Returns:
The concatenated deployment name.
"""
try:
oid = oid_dict['$oid']
return f"{deployment_name}_oid_{oid}"
except KeyError:
return f"{deployment_name}" # Return original name if 'oid' key is missing
def json_to_csv(input_path, taxa_list_path):
#preload this stuff for faster lookup
taxa_lookup = load_taxa_lookup(taxa_list_path)
# Get the last folder name from the input path
folder_name = os.path.basename(input_path)
# Get the parent folder name
parent_folder = os.path.basename(os.path.dirname(input_path))
# Get the current date in YYYY-MM-DD format
current_date = datetime.today().strftime('%Y-%m-%d')
# Create the output CSV file name
output_file = f"{parent_folder}_{folder_name}_exportdate_{current_date}.csv"
# Append "samples.json" to the input path to get the correct file path
json_file_path = os.path.join(input_path, "samples.json")
try:
with open(json_file_path, "r") as f:
data = json.load(f)
print("Found fiftyone dataset's json path")
except FileNotFoundError:
print("Could not find fiftyone dataset's json path, maybe try a different folder?")
return
with open(input_path+"/"+output_file, "w", newline="") as csvfile:
fieldnames = ["basisOfRecord","datasetID","parentEventID","eventID","occurrenceID","verbatimEventDate","eventDate","eventTime","UTCOFFSET","detectionBy","detection_confidence","identifiedBy","ID_confidence","kingdom","phylum","class","order","family","genus","species","taxonID","commonName","scientificName","filepath", "mothbox","software","sheet","country", "area", "point","latitude","longitude","ground_height","deployment_name","deployment_date","collect_date", "data_storage_location","crew", "notes", "schedule","habitat", "image_id", "label", "bbox", "segmentation", "attractor", "attractor_location"] # Adjust fieldnames as needed
csv_writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
csv_writer.writeheader()
for sample in data["samples"]:
#tags are the only thing directly editable in 51
detectionBy=""
identified_by=""
kingdom=""
phylum=""
tclass=""
order=""
family=""
genus=""
species=""
commonName=""
scientificName=""
taxon_id=""
ground_height=""
attractor=""
attractor_location=""
UTC=""
# Regular expression pattern to extract date and timestamp
pattern = r"_(\d{4}_\d{2}_\d{2})__(\d{2}_\d{2}_\d{2})"
# Search for the pattern in the file path
match = re.search(pattern, sample["filepath"])
if match:
date = match.group(1)
timestamp = match.group(2)
#print(f"Date: {date}")
#print(f"Timestamp: {timestamp}")
else:
print("No date and timestamp found in the file path.")
for tag in sample["tags"]:
# Check for prefixes
if tag.startswith("KINGDOM"):
kingdom = tag[len("KINGDOM"):].strip('_')
elif tag.startswith("PHYLUM"):
phylum = tag[len("PHYLUM"):].strip('_')
elif tag.startswith("CLASS"):
tclass = tag[len("CLASS"):].strip('_')
elif tag.startswith("ORDER"):
order = tag[len("ORDER"):].strip('_')
#print("found an ORDER tag!")
elif tag.startswith("FAMILY"):
family = tag[len("FAMILY"):].strip('_')
elif tag.startswith("GENUS"):
genus = tag[len("GENUS"):].strip('_')
elif tag.startswith("SPECIES"):
species = tag[len("SPECIES"):].strip('_')
elif tag.startswith("commonName"):
common_name = tag[len("commonName"):].strip('_')
elif tag.startswith("scientificName"):
scientificName = tag[len("scientificName"):].strip('_')
elif tag.startswith("IDby"):
identified_by = tag[len("IDby"):].strip('_')
elif tag.upper().startswith("ERROR"):
commonName="ERROR"
#print("found an error tag!")
if tag.startswith("taxonID"):
taxon_id = tag[len("taxonID"):].strip('_')
elif tag.startswith("phylum"):
phylum = tag[len("phylum"):].strip('_')
#fieldnames = ["basisOfRecord","datasetID","parentEventID","eventID","occurrenceID","verbatimEventDate","eventDate","eventTime","identifiedBy","taxonID","kingdom","phylum","class","order","family","genus","species","commonName","scientificName","filepath", "mothbox","software","sheet","country", "area", "point","latitude","longitude","height","deployment_name","deployment_date","sample_time","collect_date", "data_storage_location","crew", "notes", "schedule","habitat", "image_id", "label", "bbox", "segmentation"] # Adjust fieldnames as needed
#print("sample")
#deepest_rank, deepest_value = get_deepest_classification(kingdom, phylum,tclass, order, family, genus,species)
# Find the deepest available rank and its taxonomic details
taxa_values = [kingdom, phylum, tclass, order, family, genus, species]
taxon_id, scientificName= find_deepest_taxon_info(TAXA, taxa_values, taxa_lookup)
uniquedatasetID=create_uniquedatasetID(sample["deployment_name"],sample["_dataset_id"])
# Adjust date and timestamp based on the UTC offset
formattedUTC_dateTime = format_datetime_with_utc_offset(date, timestamp, float(sample["UTC"]))
row = {
"filepath":sample["filepath"],
"basisOfRecord": "MachineObservation",
"datasetID":uniquedatasetID,
"parentEventID":sample["deployment_name"],
"eventID":os.path.basename(sample['filepath_fullimage']),
"occurrenceID":os.path.basename(sample["filepath"]),
"verbatimEventDate":date+"__"+timestamp,
"eventDate":formattedUTC_dateTime,
"eventTime":formattedUTC_dateTime.split("T")[1],
"UTCOFFSET":sample["UTC"],
"mothbox":sample["device"],
"software":sample["firmware"],
"sheet":sample["sheet"],
"country":sample["datasetcollection"],
"area":sample["project"],
"point":sample["site"],
"latitude":sample["latitude"],
"longitude":sample["longitude"],
"ground_height":sample["ground_height"],
"attractor":sample["attractor"],
"attractor_location":sample["attractor_location"],
"deployment_name":sample["deployment_name"],
"deployment_date":sample["deployment_date"],
#"sample_time":formattedUTC_dateTime,
"collect_date":sample["collect_date"],
"data_storage_location":sample["data_storage_location"],
"crew":sample["crew"],
"notes": "", #sample["notes"], #Disabled for now, hubert doesn't want notes for each critter sighting
"schedule":sample["schedule"],
"habitat":sample["habitat"],
#detection specific
"detectionBy": sample["detection_By"],
"identifiedBy":identified_by,
"ID_confidence": sample["confidence"],
"kingdom":kingdom,
"phylum":phylum,
"class":tclass,
"order":order,
"family":family,
"genus":genus,
"species":species,
"commonName":commonName,
"scientificName":scientificName,
"taxonID":taxon_id,
}
csv_writer.writerow(row)
print(f"CSV file created: {output_file}")
# This code will only run if this script is executed directly
if __name__ == "__main__":
args = parse_args()
INPUT_PATH=args.input_path
TAXA_LIST_PATH=args.taxa_csv
#UTC_OFFSET=int(args.utcoff)
# Call the function with the input path
json_to_csv(INPUT_PATH, TAXA_LIST_PATH)