Skip to main content
Practical code examples for working with the Avala platform using the official SDKs and REST API. Covers basic operations, advanced patterns like retry logic and async batch processing, and production workflows.

Setup

from avala import Client

client = Client()  # reads AVALA_API_KEY env var

Dataset Management

List Datasets

Retrieve all datasets accessible to your account.
page = client.datasets.list(limit=20)
for dataset in page:
    print(f"{dataset.name} ({dataset.uid})")

Get Dataset Details

Fetch details for a specific dataset by UID.
dataset = client.datasets.get("dataset-uid-here")
print(f"Name: {dataset.name}")
print(f"Slug: {dataset.slug}")

Project Workflows

List Projects

Retrieve projects accessible to your account.
page = client.projects.list()
for project in page:
    print(f"{project.name} ({project.uid})")

Get Project Metrics

Check the progress and quality metrics for a project.
import requests
import os

# Project metrics are available via the REST API
response = requests.get(
    f"https://api.avala.ai/api/v1/projects/proj-uuid-001/metrics/",
    headers={"X-Avala-Api-Key": os.environ["AVALA_API_KEY"]}
)
metrics = response.json()
completion = (metrics["completed_tasks"] / metrics["total_tasks"]) * 100
print(f"Progress: {completion:.1f}%")
print(f"Acceptance rate: {metrics['acceptance_rate'] * 100:.1f}%")

Export Pipeline

Create Export, Poll for Completion, and Download

A complete workflow for exporting annotation data from a project.
import time

# Create the export
export = client.exports.create(project="proj-uuid-001")
print(f"Export started: {export.uid}")

# Poll for completion
while True:
    export = client.exports.get(export.uid)

    if export.status == "completed":
        print(f"Download URL: {export.download_url}")
        break
    elif export.status == "failed":
        raise Exception("Export failed")

    time.sleep(5)

Annotation Retrieval

List Tasks

Retrieve tasks for a project, optionally filtering by status.
page = client.tasks.list(project="proj-uuid-001", status="completed")
for task in page:
    print(f"Task {task.uid}")

Organization Management

List Members

Retrieve all members of your organization.
import requests
import os

# Organization management is available via the REST API
response = requests.get(
    "https://api.avala.ai/api/v1/organizations/acme-ai/members/",
    headers={"X-Avala-Api-Key": os.environ["AVALA_API_KEY"]}
)
members = response.json()["results"]
for member in members:
    print(f"{member['user']['username']} - {member['role']}")

Send Invitation

Invite a new member to your organization.
import requests
import os

response = requests.post(
    "https://api.avala.ai/api/v1/organizations/acme-ai/invitations/",
    headers={
        "X-Avala-Api-Key": os.environ["AVALA_API_KEY"],
        "Content-Type": "application/json",
    },
    json={
        "email": "newuser@example.com",
        "role": "annotator"
    }
)
invitation = response.json()
print(f"Invitation sent to {invitation['email']}")

Error Handling

Using SDK Error Classes

Handle errors using the SDK’s built-in error classes.
from avala import Client
from avala.errors import AvalaError, AuthenticationError, NotFoundError, RateLimitError

client = Client()

try:
    dataset = client.datasets.get("nonexistent-uid")
except NotFoundError:
    print("Dataset not found")
except RateLimitError:
    print("Rate limited — try again later")
except AuthenticationError:
    print("Invalid API key")
except AvalaError as e:
    print(f"API error: {e}")

Pagination

Iterate Through All Results

Fetch all pages of a paginated endpoint.
# CursorPage supports iteration and auto-pagination
page = client.datasets.list(limit=20)
all_datasets = []

while True:
    for dataset in page:
        all_datasets.append(dataset)

    if not page.has_more:
        break
    page = client.datasets.list(cursor=page.next_cursor, limit=20)

print(f"Total datasets: {len(all_datasets)}")

Advanced Patterns

Retry with Exponential Backoff

The SDKs raise RateLimitError when you hit the rate limit. Build a retry wrapper that respects the Retry-After header.
import time
import random
from avala.errors import RateLimitError, ServerError

def with_retry(fn, max_retries=5):
    """Call fn() with exponential backoff on rate limit or server errors."""
    last_error = None
    for attempt in range(max_retries):
        try:
            return fn()
        except RateLimitError as e:
            last_error = e
            wait = e.retry_after or (2 ** attempt + random.random())
            print(f"Rate limited. Retrying in {wait:.1f}s (attempt {attempt + 1}/{max_retries})")
            time.sleep(wait)
        except ServerError as e:
            last_error = e
            wait = 2 ** attempt + random.random()
            print(f"Server error. Retrying in {wait:.1f}s (attempt {attempt + 1}/{max_retries})")
            time.sleep(wait)
    raise last_error or Exception(f"Failed after {max_retries} retries")

# Usage
dataset = with_retry(lambda: client.datasets.get("ds_abc123"))

Async Batch Processing

Use the async client to process multiple items concurrently with controlled parallelism.
import asyncio
from avala import AsyncClient

async def main():
    async with AsyncClient() as client:
        # Collect all dataset UIDs across pages
        dataset_uids = []
        page = await client.datasets.list(limit=50)
        while True:
            for ds in page:
                dataset_uids.append(ds.uid)
            if not page.has_more:
                break
            page = await client.datasets.list(cursor=page.next_cursor, limit=50)

        # Process in batches of 10 to avoid rate limits
        batch_size = 10
        results = []
        for i in range(0, len(dataset_uids), batch_size):
            batch = dataset_uids[i : i + batch_size]
            batch_results = await asyncio.gather(
                *[client.datasets.get(uid) for uid in batch]
            )
            results.extend(batch_results)

        for ds in results:
            print(f"{ds.name}: {ds.item_count} items")

asyncio.run(main())

Export with Timeout and Error Recovery

A production-ready export workflow with a timeout, progress logging, and proper error handling.
import time
from avala.errors import AvalaError

def export_project(client, project_uid, timeout_seconds=600, poll_interval=5):
    """Export a project and wait for completion.

    Returns the download URL on success.
    Raises TimeoutError if the export doesn't complete in time.
    """
    export = client.exports.create(project=project_uid)
    print(f"Export {export.uid} started")

    deadline = time.time() + timeout_seconds
    while time.time() < deadline:
        export = client.exports.get(export.uid)

        if export.status == "completed":
            print(f"Export completed: {export.download_url}")
            return export.download_url
        elif export.status == "failed":
            raise RuntimeError(f"Export {export.uid} failed")

        elapsed = timeout_seconds - (deadline - time.time())
        print(f"  status={export.status} ({elapsed:.0f}s elapsed)")
        time.sleep(poll_interval)

    raise TimeoutError(f"Export {export.uid} did not complete within {timeout_seconds}s")

# Usage
try:
    url = export_project(client, "proj_abc123", timeout_seconds=300)
except TimeoutError:
    print("Export timed out — try again or contact support")
except AvalaError as e:
    print(f"API error: {e}")

Upload Items via REST API

The SDKs focus on read operations. To upload items to a dataset, use the REST API directly.
import os
import requests
from pathlib import Path

API_KEY = os.environ["AVALA_API_KEY"]
BASE = "https://api.avala.ai/api/v1"
HEADERS = {"X-Avala-Api-Key": API_KEY}

def upload_items(dataset_uid, file_paths):
    """Upload a list of files to a dataset."""
    uploaded = []
    for path in file_paths:
        path = Path(path)
        with open(path, "rb") as f:
            response = requests.post(
                f"{BASE}/datasets/{dataset_uid}/items/",
                headers=HEADERS,
                files={"file": (path.name, f)},
            )
        response.raise_for_status()
        item = response.json()
        uploaded.append(item["uid"])
        print(f"Uploaded {path.name} -> {item['uid']}")
    return uploaded

# Upload all PNGs from a directory
images = sorted(Path("./training-data").glob("*.png"))
item_uids = upload_items("ds_abc123", images)
print(f"Uploaded {len(item_uids)} items")

Batch Export Multiple Projects

Export several projects in parallel and wait for all to complete.
import asyncio
import time
from avala import AsyncClient

async def export_and_wait(client, project_uid, timeout=600, poll_interval=5):
    export = await client.exports.create(project=project_uid)
    deadline = time.time() + timeout
    while time.time() < deadline:
        export = await client.exports.get(export.uid)
        if export.status == "completed":
            return {"project": project_uid, "url": export.download_url}
        elif export.status == "failed":
            return {"project": project_uid, "error": "Export failed"}
        await asyncio.sleep(poll_interval)
    return {"project": project_uid, "error": "Timed out"}

async def main():
    project_uids = ["proj_001", "proj_002", "proj_003"]

    async with AsyncClient() as client:
        results = await asyncio.gather(
            *[export_and_wait(client, uid) for uid in project_uids]
        )

    for result in results:
        if "url" in result:
            print(f"{result['project']}: {result['url']}")
        else:
            print(f"{result['project']}: FAILED — {result['error']}")

asyncio.run(main())

Monitor Rate Limit Usage

Check your remaining rate limit budget before starting batch operations.
# Make any request to populate rate limit info
client.datasets.list(limit=1)
info = client.rate_limit_info

remaining = int(info.get("remaining") or 0)
limit = int(info.get("limit") or 0)
print(f"Rate limit: {remaining}/{limit} requests remaining")

if remaining < 50:
    print("Warning: low rate limit budget. Consider slowing down requests.")