Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions deployment/docker-build/cleaner.dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
FROM ubuntu:24.04 as base

ENV DEBIAN_FRONTEND noninteractive

RUN apt-get update && apt-get -y upgrade && apt-get install -y software-properties-common
RUN add-apt-repository -y ppa:deadsnakes/ppa

# Runtime + build packages
RUN apt-get update && apt-get -y upgrade && apt-get install -y \
libpq5 \
python3.12

FROM base as builder

# Build-only packages
RUN apt-get update && apt-get install -y \
build-essential \
python3.12-dev \
python3.12-venv \
libpq-dev

# Create virtualenv
RUN python3.12 -m venv /opt/venv

# Install pip
ENV PIP_NO_CACHE_DIR yes
RUN /opt/venv/bin/python3.12 -m pip install --upgrade pip wheel
ENV PATH="/opt/venv/bin:${PATH}"

# Install only the minimal dependencies needed for ipfs_pin_cleaner.py
RUN pip install \
aioipfs~=0.7.1 \
asyncpg==0.30

FROM base

RUN groupadd -g 1000 -o aleph
RUN useradd -s /bin/bash -u 1000 -g 1000 -o aleph

COPY --from=builder --chown=aleph /opt/venv /opt/venv

# Copy only the ipfs_pin_cleaner.py script
RUN mkdir -p /opt/cleaner/deployment/scripts
COPY --chown=aleph ./ipfs_pin_cleaner.py /opt/cleaner/deployment/scripts/

ENV PATH="/opt/venv/bin:${PATH}"
WORKDIR /opt/cleaner
USER aleph

# Default entrypoint to run the cleaner script
ENTRYPOINT ["python3.12", "deployment/scripts/ipfs_pin_cleaner.py"]
232 changes: 232 additions & 0 deletions deployment/scripts/ipfs_pin_cleaner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
import asyncio
import aioipfs
import asyncpg
import argparse
import logging
import os

# Configure logging to provide clear output
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

DATABASE_DSN="postgres://USERNAME:PASSWORD@localhost:8080/aleph"
IPFS_API="'/ip4/127.0.0.1/tcp/5001'"

async def get_ipfs_pins(api_addr: str) -> set:
"""
Connects to an IPFS instance and retrieves a set of recursively pinned CIDs.

Args:
api_addr: The multiaddress of the IPFS API endpoint (e.g., '/ip4/127.0.0.1/tcp/5001').

Returns:
A set of recursively pinned CIDs (as strings).
"""
logging.info(f"Connecting to IPFS API at {api_addr}...")
client = None
try:
client = aioipfs.AsyncIPFS(maddr=api_addr)
pins = set()
# The 'type' argument filters for recursive pins directly in the API call.
# The result is an async generator, so we iterate through it.
pin_list = await client.pin.ls(pintype='recursive', quiet=True)
pinned = list(pin_list['Keys'].keys())
for pin in pinned:
pins.add(pin)
logging.info(f"Found {len(pins)} recursively pinned files in IPFS.")
return pins
except Exception as e:
logging.error(f"Failed to connect or retrieve pins from IPFS: {e}")
return set()
finally:
if client:
await client.close()
logging.info("IPFS client connection closed.")


async def get_database_hashes(dsn: str) -> set:
"""
Connects to a PostgreSQL database and retrieves a set of file hashes that should be pinned.

Args:
dsn: The PostgreSQL connection string.

Returns:
A set of file hashes (as strings) that should be pinned.
"""
logging.info("Connecting to PostgreSQL database...")
conn = None
try:
conn = await asyncpg.connect(dsn)
# The query provided by the user
# query = """
# SELECT f.hash FROM file_pins fp
# INNER JOIN files f ON f.hash = fp.file_hash
# INNER JOIN messages m ON m.item_hash = fp.item_hash
# WHERE m."type" = 'STORE' and m."content"->>'item_type' = 'ipfs' \
# """
query = """
SELECT f.hash FROM files f
WHERE f.hash like 'Qm%' or f.hash like 'bafkrei%' \
"""
rows = await conn.fetch(query)
hashes = {row['hash'] for row in rows}
logging.info(f"Found {len(hashes)} files that should be pinned in the database.")
return hashes
except Exception as e:
logging.error(f"Failed to connect or query the database: {e}")
return set()
finally:
if conn:
await conn.close()
logging.info("Database connection closed.")


async def unpin_files(api_addr: str, cids_to_unpin: list):
"""
Removes pins for a given list of CIDs from the IPFS node.

Args:
api_addr: The multiaddress of the IPFS API endpoint.
cids_to_unpin: A list of CID strings to unpin.
"""
if not cids_to_unpin:
logging.info("No files to unpin.")
return

logging.info(f"Connecting to IPFS API at {api_addr} to unpin files...")
client = None
try:
client = aioipfs.AsyncIPFS(maddr=api_addr)
for cid in cids_to_unpin:
try:
logging.warning(f"Unpinning {cid}...")
await client.pin.rm(cid)
logging.info(f"Successfully unpinned {cid}.")
except Exception as e:
logging.error(f"Failed to unpin {cid}: {e}")
except Exception as e:
logging.error(f"Failed to connect to IPFS for unpinning: {e}")
finally:
if client:
await client.close()
logging.info("IPFS client connection closed after unpinning.")

async def pin_files(api_addr: str, cids_to_pin: list):
"""
Pins a given list of CIDs to the IPFS node.

Args:
api_addr: The multiaddress of the IPFS API endpoint.
cids_to_pin: A list of CID strings to pin.
"""
if not cids_to_pin:
logging.info("No files to pin.")
return

logging.info(f"Connecting to IPFS API at {api_addr} to pin files...")
client = None
try:
client = aioipfs.AsyncIPFS(maddr=api_addr)
for cid in cids_to_pin:
try:
logging.info(f"Pinning {cid}...")
# The 'add' method pins recursively by default
async for cid_pin in client.pin.add(cid):
print('Pin progress', cid_pin['Progress'])
logging.info(f"Successfully pinned {cid}.")
except Exception as e:
logging.error(f"Failed to pin {cid}: {e}")
except Exception as e:
logging.error(f"Failed to connect to IPFS for pinning: {e}")
finally:
if client:
await client.close()
logging.info("IPFS client connection closed after pinning.")


async def main():
"""
Main function to orchestrate the IPFS pin synchronization process.
"""
parser = argparse.ArgumentParser(
description="Compares IPFS pins with a database record and optionally syncs the state."
)
# IPFS arguments
parser.add_argument(
'--ipfs-api',
default=os.getenv('IPFS_API', IPFS_API),
help="IPFS API multiaddress (default: /ip4/127.0.0.1/tcp/5001)"
)
# PostgreSQL arguments from environment variables for security
parser.add_argument(
'--db-dsn',
default=os.getenv('DATABASE_DSN', DATABASE_DSN),
help="PostgreSQL DSN (e.g., 'postgres://user:pass@host:port/dbname'). "
"Can also be set via DATABASE_DSN environment variable."
)
# Action arguments
parser.add_argument(
'--unpin',
action='store_true',
help="Actually perform the unpinning of files. Default is a dry run."
)
parser.add_argument(
'--pin',
action='store_true',
help="Actually perform the pinning of missing files. Default is a dry run."
)
args = parser.parse_args()

if not args.db_dsn:
logging.error("Database DSN must be provided via --db-dsn argument or DATABASE_DSN environment variable.")
return

# Get the two sets of hashes/CIDs
ipfs_pins = await get_ipfs_pins(args.ipfs_api)
db_hashes = await get_database_hashes(args.db_dsn)

if not ipfs_pins and not db_hashes:
logging.warning("Both IPFS and database checks returned empty sets. Exiting.")
return

# --- 1. Check for files in IPFS that should be UNPINNED ---
pins_to_remove = ipfs_pins - db_hashes

if not pins_to_remove:
logging.info("All pinned files are correctly referenced in the database.")
else:
logging.warning(f"Found {len(pins_to_remove)} files to UNPIN (in IPFS, not in DB):")
for cid in pins_to_remove:
print(f" - {cid}")

if args.unpin:
logging.info("--- UNPINNING ENABLED ---")
await unpin_files(args.ipfs_api, list(pins_to_remove))
logging.info("--- UNPINNING PROCESS COMPLETE ---")
else:
logging.info("-> This was a dry run. Use --unpin to remove them.")

print("-" * 50)

# --- 2. Check for files in DB that should be PINNED ---
hashes_to_add = db_hashes - ipfs_pins

if not hashes_to_add:
logging.info("All necessary files from the database are already pinned in IPFS.")
else:
for cid in hashes_to_add:
print(f" + {cid}")

if args.pin:
logging.info("--- PINNING ENABLED ---")
await pin_files(args.ipfs_api, list(hashes_to_add))
logging.info("--- PINNING PROCESS COMPLETE ---")
else:
logging.info("-> This was a dry run. Use --pin to add them.")

logging.warning(f"Found {len(pins_to_remove)} files to UNPIN (in IPFS, not in DB):")
logging.warning(f"Found {len(hashes_to_add)} files to PIN (in DB, not in IPFS):")


if __name__ == "__main__":
asyncio.run(main())
102 changes: 102 additions & 0 deletions deployment/scripts/run_ipfs_cleanup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#!/bin/bash

# Exit immediately if a command exits with a non-zero status.
set -e

# --- Initial Setup & User Input ---
echo "This script will clean unpinned IPFS objects and reclaim disk space."
echo "--------------------------------------------------------------------"
read -p "Enter your DATABASE_USERNAME (default: aleph): " DB_USER
DB_USER=${DB_USER:-aleph}

read -s -p "Enter your DATABASE_PASSWORD: " DB_PASS
echo
if [ -z "$DB_PASS" ]; then
echo "❌ Error: Database password cannot be empty."
exit 1
fi

# --- 1. Measure Initial Space Usage ---
echo -e "\n📊 Checking initial disk space usage..."
IPFS_CONTAINER=$(docker ps -a --format "{{.Names}}" | grep ipfs | head -n 1)
if [ -z "$IPFS_CONTAINER" ]; then
echo "❌ Error: Could not find the IPFS container."
exit 1
fi

# Find the volume name mounted to the IPFS container
IPFS_VOLUME=$(docker inspect -f '{{range .Mounts}}{{if eq .Destination "/data/ipfs"}}{{.Name}}{{end}}{{end}}' "$IPFS_CONTAINER")
if [ -z "$IPFS_VOLUME" ]; then
echo "❌ Error: Could not find the IPFS data volume for container '$IPFS_CONTAINER'."
exit 1
fi

# Use 'docker system df -v' to find the volume's reported size
# We grep for the volume name and get the last column containing the size
INITIAL_SIZE_HR=$(docker system df -v | grep -A 9999 "VOLUME" | grep -w "$IPFS_VOLUME" | awk '{print $NF}')

if [ -z "$INITIAL_SIZE_HR" ]; then
echo " - ⚠️ Warning: Could not determine initial size from 'docker system df'."
INITIAL_SIZE_HR="N/A"
fi
echo " - IPFS Volume: $IPFS_VOLUME"
echo " - Initial Size (from docker df): $INITIAL_SIZE_HR"

# --- 2. Download Files ---
echo -e "\n⬇️ Downloading Dockerfile and cleaner script..."
wget -q --show-progress -O cleaner.dockerfile "https://raw.githubusercontent.com/aleph-im/pyaleph/refs/heads/andres-feature-implement_experimental_ipfs_pin_cleaner/deployment/docker-build/cleaner.dockerfile"
wget -q --show-progress -O ipfs_pin_cleaner.py "https://raw.githubusercontent.com/aleph-im/pyaleph/refs/heads/andres-feature-implement_experimental_ipfs_pin_cleaner/deployment/scripts/ipfs_pin_cleaner.py"

# --- 3. Build Docker Image ---
echo -e "\n🛠️ Building 'ipfs-pin-cleaner' Docker image..."
docker build -f cleaner.dockerfile -t ipfs-pin-cleaner . > /dev/null
echo " - Image built successfully."

# --- 4. Stop Containers ---
echo -e "\n🛑 Stopping non-essential containers..."
docker-compose stop pyaleph pyaleph-api p2p-service rabbitmq redis

# --- 5. Get Network and IPFS Info ---
echo -e "\n🔎 Identifying network and IPFS container details..."
PYALEPH_NETWORK=$(docker network list --format "{{.Name}}" | grep pyaleph | head -n 1)
IPFS_IP=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' "$IPFS_CONTAINER")
echo " - Network: $PYALEPH_NETWORK, IPFS IP: $IPFS_IP"

# --- 6. Run IPFS Pin Cleaner ---
echo -e "\n🧹 Running the IPFS pin cleaner (this may take a while)..."
docker run --rm --network "$PYALEPH_NETWORK" \
-e DATABASE_DSN="postgres://${DB_USER}:${DB_PASS}@postgres:5432/aleph" \
-e IPFS_API="/ip4/${IPFS_IP}/tcp/5001" \
ipfs-pin-cleaner --unpin

# --- 7. Execute IPFS Garbage Collector ---
echo -e "\n🗑️ Executing IPFS garbage collector..."
docker exec -it "$IPFS_CONTAINER" ipfs repo gc

# --- 8. Measure Final Space ---
echo -e "\n📊 Checking final disk space usage..."
# A small sleep can give Docker's daemon time to update its disk usage stats
sleep 5
FINAL_SIZE_HR=$(docker system df -v | grep -A 9999 "VOLUME" | grep -w "$IPFS_VOLUME" | awk '{print $NF}')

if [ -z "$FINAL_SIZE_HR" ]; then
echo " - ⚠️ Warning: Could not determine final size from 'docker system df'."
FINAL_SIZE_HR="N/A"
fi
echo " - Final Size (from docker df): $FINAL_SIZE_HR"

# --- 9. Restart All Containers ---
echo -e "\n🚀 Starting all services..."
docker-compose up -d

# --- 10. Cleanup ---
echo -e "\n✨ Cleaning up temporary files..."
rm cleaner.dockerfile ipfs_pin_cleaner.py

# --- Final Summary ---
echo -e "\n------------------- SUMMARY -------------------"
echo -e "Initial size reported: \033[1;31m$INITIAL_SIZE_HR\033[0m"
echo -e "Final size reported: \033[1;32m$FINAL_SIZE_HR\033[0m"
echo -e "\nℹ️ Compare the values above to see the reclaimed space."
echo "-----------------------------------------------"
echo "✅ All tasks finished successfully!"
Loading