Documentation Index
Fetch the complete documentation index at: https://avala.ai/docs/llms.txt
Use this file to discover all available pages before exploring further.
Practical code examples for working with the Avala platform using the official SDKs and REST API. Covers basic operations, advanced patterns like retry logic and async batch processing, and production workflows.
Setup
from avala import Client
client = Client() # reads AVALA_API_KEY env var
Dataset Management
List Datasets
Retrieve all datasets accessible to your account.
page = client.datasets.list(limit=20)
for dataset in page:
print(f"{dataset.name} ({dataset.uid})")
Get Dataset Details
Fetch details for a specific dataset by UID.
dataset = client.datasets.get("dataset-uid-here")
print(f"Name: {dataset.name}")
print(f"Slug: {dataset.slug}")
Project Workflows
List Projects
Retrieve projects accessible to your account.
page = client.projects.list()
for project in page:
print(f"{project.name} ({project.uid})")
Get Project Metrics
Check the progress and quality metrics for a project.
import requests
import os
# Project metrics are available via the REST API
response = requests.get(
f"https://api.avala.ai/api/v1/projects/proj-uuid-001/metrics/",
headers={"X-Avala-Api-Key": os.environ["AVALA_API_KEY"]}
)
metrics = response.json()
completion = (metrics["completed_tasks"] / metrics["total_tasks"]) * 100
print(f"Progress: {completion:.1f}%")
print(f"Acceptance rate: {metrics['acceptance_rate'] * 100:.1f}%")
Export Pipeline
Create Export, Poll for Completion, and Download
A complete workflow for exporting annotation data from a project.
import time
# Create the export
export = client.exports.create(project="proj-uuid-001")
print(f"Export started: {export.uid}")
# Poll for completion
while True:
export = client.exports.get(export.uid)
if export.status == "completed":
print(f"Download URL: {export.download_url}")
break
elif export.status == "failed":
raise Exception("Export failed")
time.sleep(5)
Annotation Retrieval
List Tasks
Retrieve tasks for a project, optionally filtering by status.
page = client.tasks.list(project="proj-uuid-001", status="completed")
for task in page:
print(f"Task {task.uid}")
Organization Management
List Members
Retrieve all members of your organization.
import requests
import os
# Organization management is available via the REST API
response = requests.get(
"https://api.avala.ai/api/v1/organizations/acme-ai/members/",
headers={"X-Avala-Api-Key": os.environ["AVALA_API_KEY"]}
)
members = response.json()["results"]
for member in members:
print(f"{member['user']['username']} - {member['role']}")
Send Invitation
Invite a new member to your organization.
import requests
import os
response = requests.post(
"https://api.avala.ai/api/v1/organizations/acme-ai/invitations/",
headers={
"X-Avala-Api-Key": os.environ["AVALA_API_KEY"],
"Content-Type": "application/json",
},
json={
"email": "newuser@example.com",
"role": "annotator"
}
)
invitation = response.json()
print(f"Invitation sent to {invitation['email']}")
Error Handling
Using SDK Error Classes
Handle errors using the SDK’s built-in error classes.
from avala import Client
from avala.errors import AvalaError, AuthenticationError, NotFoundError, RateLimitError
client = Client()
try:
dataset = client.datasets.get("nonexistent-uid")
except NotFoundError:
print("Dataset not found")
except RateLimitError:
print("Rate limited — try again later")
except AuthenticationError:
print("Invalid API key")
except AvalaError as e:
print(f"API error: {e}")
Iterate Through All Results
Fetch all pages of a paginated endpoint.
# CursorPage supports iteration and auto-pagination
page = client.datasets.list(limit=20)
all_datasets = []
while True:
for dataset in page:
all_datasets.append(dataset)
if not page.has_more:
break
page = client.datasets.list(cursor=page.next_cursor, limit=20)
print(f"Total datasets: {len(all_datasets)}")
Advanced Patterns
Retry with Exponential Backoff
The SDKs raise RateLimitError when you hit the rate limit. Build a retry wrapper that respects the Retry-After header.
import time
import random
from avala.errors import RateLimitError, ServerError
def with_retry(fn, max_retries=5):
"""Call fn() with exponential backoff on rate limit or server errors."""
last_error = None
for attempt in range(max_retries):
try:
return fn()
except RateLimitError as e:
last_error = e
wait = e.retry_after or (2 ** attempt + random.random())
print(f"Rate limited. Retrying in {wait:.1f}s (attempt {attempt + 1}/{max_retries})")
time.sleep(wait)
except ServerError as e:
last_error = e
wait = 2 ** attempt + random.random()
print(f"Server error. Retrying in {wait:.1f}s (attempt {attempt + 1}/{max_retries})")
time.sleep(wait)
raise last_error or Exception(f"Failed after {max_retries} retries")
# Usage
dataset = with_retry(lambda: client.datasets.get("ds_abc123"))
Async Batch Processing
Use the async client to process multiple items concurrently with controlled parallelism.
import asyncio
from avala import AsyncClient
async def main():
async with AsyncClient() as client:
# Collect all dataset UIDs across pages
dataset_uids = []
page = await client.datasets.list(limit=50)
while True:
for ds in page:
dataset_uids.append(ds.uid)
if not page.has_more:
break
page = await client.datasets.list(cursor=page.next_cursor, limit=50)
# Process in batches of 10 to avoid rate limits
batch_size = 10
results = []
for i in range(0, len(dataset_uids), batch_size):
batch = dataset_uids[i : i + batch_size]
batch_results = await asyncio.gather(
*[client.datasets.get(uid) for uid in batch]
)
results.extend(batch_results)
for ds in results:
print(f"{ds.name}: {ds.item_count} items")
asyncio.run(main())
Export with Timeout and Error Recovery
A production-ready export workflow with a timeout, progress logging, and proper error handling.
import time
from avala.errors import AvalaError
def export_project(client, project_uid, timeout_seconds=600, poll_interval=5):
"""Export a project and wait for completion.
Returns the download URL on success.
Raises TimeoutError if the export doesn't complete in time.
"""
export = client.exports.create(project=project_uid)
print(f"Export {export.uid} started")
deadline = time.time() + timeout_seconds
while time.time() < deadline:
export = client.exports.get(export.uid)
if export.status == "completed":
print(f"Export completed: {export.download_url}")
return export.download_url
elif export.status == "failed":
raise RuntimeError(f"Export {export.uid} failed")
elapsed = timeout_seconds - (deadline - time.time())
print(f" status={export.status} ({elapsed:.0f}s elapsed)")
time.sleep(poll_interval)
raise TimeoutError(f"Export {export.uid} did not complete within {timeout_seconds}s")
# Usage
try:
url = export_project(client, "proj_abc123", timeout_seconds=300)
except TimeoutError:
print("Export timed out — try again or contact support")
except AvalaError as e:
print(f"API error: {e}")
Upload Items via REST API
The SDKs focus on read operations. To upload items to a dataset, use the REST API directly.
import os
import requests
from pathlib import Path
API_KEY = os.environ["AVALA_API_KEY"]
BASE = "https://api.avala.ai/api/v1"
HEADERS = {"X-Avala-Api-Key": API_KEY}
def upload_items(dataset_uid, file_paths):
"""Upload a list of files to a dataset."""
uploaded = []
for path in file_paths:
path = Path(path)
with open(path, "rb") as f:
response = requests.post(
f"{BASE}/datasets/{dataset_uid}/items/",
headers=HEADERS,
files={"file": (path.name, f)},
)
response.raise_for_status()
item = response.json()
uploaded.append(item["uid"])
print(f"Uploaded {path.name} -> {item['uid']}")
return uploaded
# Upload all PNGs from a directory
images = sorted(Path("./training-data").glob("*.png"))
item_uids = upload_items("ds_abc123", images)
print(f"Uploaded {len(item_uids)} items")
Batch Export Multiple Projects
Export several projects in parallel and wait for all to complete.
import asyncio
import time
from avala import AsyncClient
async def export_and_wait(client, project_uid, timeout=600, poll_interval=5):
export = await client.exports.create(project=project_uid)
deadline = time.time() + timeout
while time.time() < deadline:
export = await client.exports.get(export.uid)
if export.status == "completed":
return {"project": project_uid, "url": export.download_url}
elif export.status == "failed":
return {"project": project_uid, "error": "Export failed"}
await asyncio.sleep(poll_interval)
return {"project": project_uid, "error": "Timed out"}
async def main():
project_uids = ["proj_001", "proj_002", "proj_003"]
async with AsyncClient() as client:
results = await asyncio.gather(
*[export_and_wait(client, uid) for uid in project_uids]
)
for result in results:
if "url" in result:
print(f"{result['project']}: {result['url']}")
else:
print(f"{result['project']}: FAILED — {result['error']}")
asyncio.run(main())
Monitor Rate Limit Usage
Check your remaining rate limit budget before starting batch operations.
# Make any request to populate rate limit info
client.datasets.list(limit=1)
info = client.rate_limit_info
remaining = int(info.get("remaining") or 0)
limit = int(info.get("limit") or 0)
print(f"Rate limit: {remaining}/{limit} requests remaining")
if remaining < 50:
print("Warning: low rate limit budget. Consider slowing down requests.")