DocsSDK

SDK & API Reference

Official SDK and REST API for Compresr.

View on GitHub

Installation

pip install compresr

Two Client Types

Choose the right client based on your use case:

CompressionClient

Token-level compression. Selects the most important tokens with full control over compression rate.

Models: espresso_v1 (agnostic), latte_v1 (query-specific)

FilterClient

Chunk-level filtering. Keeps or drops entire chunks by query relevance without modifying content.

Model: coldbrew_v1

1. Agnostic Compression

from compresr import CompressionClient

client = CompressionClient(api_key="cmp_your_api_key")

result = client.compress(
    context="Your very long context that needs compression...",
    compression_model_name="espresso_v1",
    target_compression_ratio=0.5  # optional, default: 0.5
)

print(f"Original: {result.data.original_tokens} tokens")
print(f"Compressed: {result.data.compressed_tokens} tokens")
print(f"Saved: {result.data.tokens_saved} tokens")

2. Question-Specific Compression

from compresr import CompressionClient

client = CompressionClient(api_key="cmp_your_api_key")

context = """The James Webb Space Telescope was launched on December 25, 2021 aboard an Ariane 5 rocket. It cost $10 billion and took 20 years to develop. JWST orbits the Sun at the L2 Lagrange point, 1.5 million km from Earth. Its primary mirror spans 6.5 meters across 18 gold-plated beryllium segments. In 2023, it discovered galaxies dating to just 300 million years after the Big Bang. The sunshield is the size of a tennis court, keeping instruments at -233°C. NASA, ESA, and CSA operate the mission jointly. The telescope observes in infrared wavelengths between 0.6 and 28.3 micrometers. Over 5,000 scientists submitted observation proposals in the first year."""

result = client.compress(
    context=context,
    query="What are the key engineering specs of the JWST?",
    compression_model_name="latte_v1",
    target_compression_ratio=0.5  # optional, default: 0.5
)

print(f"Compressed: {result.data.compressed_context}")
print(f"Saved: {result.data.tokens_saved} tokens")

3. Chunk-Level Filtering

from compresr import FilterClient

client = FilterClient(api_key="cmp_your_api_key")

chunks = [
    "The 737 MAX production rate increased from 26 to 31 per month in 2022.",
    "Boeing's headquarters cafeteria serves three meal options daily.",
    "Defense revenue was $23.2B, driven by F-15EX and KC-46A tanker programs.",
    "The company softball team won the regional championship in August.",
    "Free cash flow improved to -$3.5B from -$7.7B, reflecting 787 delivery ramp.",
    "Employees are encouraged to use the south parking garage on Fridays.",
]

result = client.filter(
    chunks=chunks,
    query="What are Boeing's key financial and production metrics?",
    compression_model_name="coldbrew_v1"
)

# Keeps 3 of 6 chunks — drops cafeteria, softball, and parking
print(f"Kept chunks: {result.data.compressed_context}")  # List[str]
print(f"Saved: {result.data.tokens_saved} tokens")

Streaming

Both clients support streaming for real-time output:

from compresr import CompressionClient, FilterClient

# CompressionClient streaming
client = CompressionClient(api_key="cmp_your_api_key")
for chunk in client.compress_stream(
    context="Your long context...",
    compression_model_name="espresso_v1",
    target_compression_ratio=0.5  # optional, default: 0.5
):
    print(chunk.content, end="", flush=True)

# Query-specific streaming
for chunk in client.compress_stream(
    context="Your context...",
    query="What is important?",
    compression_model_name="latte_v1",
    target_compression_ratio=0.5  # optional, default: 0.5
):
    print(chunk.content, end="", flush=True)

# FilterClient does not support streaming (use filter() instead)

Async / Await

Both clients support async usage for non-blocking operations:

import asyncio
from compresr import CompressionClient, FilterClient

async def main():
    # Async compression
    client = CompressionClient(api_key="cmp_your_api_key")
    result = await client.compress_async(
        context="Your long context...",
        compression_model_name="espresso_v1",
        target_compression_ratio=0.5  # optional, default: 0.5
    )
    print(f"Compressed: {result.data.compressed_tokens} tokens")

    # Async filtering
    filter_client = FilterClient(api_key="cmp_your_api_key")
    result = await filter_client.filter_async(
        chunks=["Chunk 1...", "Chunk 2..."],
        query="What is relevant?"
    )
    print(f"Kept: {len(result.data.compressed_context)} chunks")

asyncio.run(main())

Workflow Integration

Integrate Compresr into your existing LLM workflows. Works with any LLM provider.

Agnostic (System Prompts)

from compresr import CompressionClient

compresr = CompressionClient(api_key="cmp_xxx")

# Compress your system prompt or context
compressed = compresr.compress(
    context="Your long system prompt...",
    compression_model_name="espresso_v1",
    target_compression_ratio=0.5  # optional, default: 0.5
)

# Use with any LLM provider
messages = [
    {"role": "system", "content": compressed.data.compressed_context},
    {"role": "user", "content": "Your question..."}
]

# Pass to OpenAI, Anthropic, or any other LLM

Question-Specific (RAG/QA)

from compresr import CompressionClient

compresr = CompressionClient(api_key="cmp_xxx")

user_question = "What is machine learning?"

# Compress retrieved documents based on the query
compressed = compresr.compress(
    context="Retrieved documents from your vector DB...",
    query=user_question,
    compression_model_name="latte_v1",
    target_compression_ratio=0.5  # optional, default: 0.5
)

# Use with any LLM provider
messages = [
    {"role": "system", "content": compressed.data.compressed_context},
    {"role": "user", "content": user_question}
]

Chunk Filtering (RAG Pre-filter)

from compresr import FilterClient

filter_client = FilterClient(api_key="cmp_xxx")

user_question = "What is machine learning?"

# Filter retrieved chunks — keep only relevant ones
filtered = filter_client.filter(
    chunks=["Chunk about ML...", "Chunk about cooking...", "Chunk about AI..."],
    query=user_question
)

# filtered.data.compressed_context is List[str] of kept chunks
context = "\n".join(filtered.data.compressed_context)
messages = [
    {"role": "system", "content": context},
    {"role": "user", "content": user_question}
]

License

Proprietary License - see LICENSE for details.