[ PROMPT_NODE_24525 ]
Cocoindex
[ SKILL_DOCUMENTATION ]
# CocoIndex
## Overview
CocoIndex is an ultra-performant real-time data transformation framework for AI with incremental processing. This skill enables building **indexing flows** that extract data from sources, apply transformations (chunking, embedding, LLM extraction), and export to targets (vector databases, graph databases, relational databases).
**Core capabilities:**
1. **Write indexing flows** - Define ETL pipelines using Python
2. **Create custom functions** - Build reusable transformation logic
3. **Operate flows** - Run and manage flows using CLI or Python API
**Key features:**
- Incremental processing (only processes changed data)
- Live updates (continuously sync source changes to targets)
- Built-in functions (text chunking, embeddings, LLM extraction)
- Multiple data sources (local files, S3, Azure Blob, Google Drive, Postgres)
- Multiple targets (Postgres+pgvector, Qdrant, LanceDB, Neo4j, Kuzu)
**For detailed documentation:**
**Search documentation:**
## When to Use This Skill
Use when users request:
- "Build a vector search index for my documents"
- "Create an embedding pipeline for code/PDFs/images"
- "Extract structured information using LLMs"
- "Build a knowledge graph from documents"
- "Set up live document indexing"
- "Create custom transformation functions"
- "Run/update my CocoIndex flow"
## Flow Writing Workflow
### Step 1: Understand Requirements
Ask clarifying questions to understand:
**Data source:**
- Where is the data? (local files, S3, database, etc.)
- What file types? (text, PDF, JSON, images, code, etc.)
- How often does it change? (one-time, periodic, continuous)
**Transformations:**
- What processing is needed? (chunking, embedding, extraction, etc.)
- Which embedding model? (SentenceTransformer, OpenAI, custom)
- Any custom logic? (filtering, parsing, enrichment)
**Target:**
- Where should results go? (Postgres, Qdrant, Neo4j, etc.)
- What schema? (fields, primary keys, indexes)
- Vector search needed? (specify similarity metric)
### Step 2: Set Up Dependencies
Guide user to add CocoIndex with appropriate extras to their project based on their needs:
**Required dependency:**
- `cocoindex` - Core functionality, CLI, and most built-in functions
**Optional extras (add as needed):**
- `cocoindex[embeddings]` - For SentenceTransformer embeddings (when using `SentenceTransformerEmbed`)
- `cocoindex[colpali]` - For ColPali image/document embeddings (when using `ColPaliEmbedImage` or `ColPaliEmbedQuery`)
- `cocoindex[lancedb]` - For LanceDB target (when exporting to LanceDB)
- `cocoindex[embeddings,lancedb]` - Multiple extras can be combined
**What's included:**
- Base package: Core functionality, CLI, most built-in functions, Postgres/Qdrant/Neo4j/Kuzu targets
- `embeddings` extra: SentenceTransformers library for local embedding models
- `colpali` extra: ColPali engine for multimodal document/image embeddings
- `lancedb` extra: LanceDB client library for LanceDB vector database support
Users can install using their preferred package manager (pip, uv, poetry, etc.) or add to `pyproject.toml`.
**For installation details:**
### Step 3: Set Up Environment
**Check existing environment first:**
1. Check if `COCOINDEX_DATABASE_URL` exists in environment variables
- If not found, use default: `postgres://cocoindex:cocoindex@localhost/cocoindex`
2. **For flows requiring LLM APIs** (embeddings, extraction):
- Ask user which LLM provider they want to use:
- **OpenAI** - Both generation and embeddings
- **Anthropic** - Generation only
- **Gemini** - Both generation and embeddings
- **Voyage** - Embeddings only
- **Ollama** - Local models (generation and embeddings)
- Check if the corresponding API key exists in environment variables
- If not found, **ask user to provide the API key value**
- **Never create simplified examples without LLM** - always get the proper API key and use the real LLM functions
**Guide user to create `.env` file:**
```bash
# Database connection (required - internal storage)
COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex
# LLM API keys (add the ones you need)
OPENAI_API_KEY=sk-... # For OpenAI (generation + embeddings)
ANTHROPIC_API_KEY=sk-ant-... # For Anthropic (generation only)
GOOGLE_API_KEY=... # For Gemini (generation + embeddings)
VOYAGE_API_KEY=pa-... # For Voyage (embeddings only)
# Ollama requires no API key (local)
```
**For more LLM options:**
Create basic project structure:
```python
# main.py
from dotenv import load_dotenv
import cocoindex
@cocoindex.flow_def(name="FlowName")
def my_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
# Flow definition here
pass
if __name__ == "__main__":
load_dotenv()
cocoindex.init()
my_flow.update()
```
### Step 4: Write the Flow
Follow this structure:
```python
@cocoindex.flow_def(name="DescriptiveName")
def flow_name(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
# 1. Import source data
data_scope["source_name"] = flow_builder.add_source(
cocoindex.sources.SourceType(...)
)
# 2. Create collector(s) for outputs
collector = data_scope.add_collector()
# 3. Transform data (iterate through rows)
with data_scope["source_name"].row() as item:
# Apply transformations
item["new_field"] = item["existing_field"].transform(
cocoindex.functions.FunctionName(...)
)
...
# Nested iteration (e.g., chunks within documents)
with item["nested_table"].row() as nested_item:
# More transformations
nested_item["embedding"] = nested_item["text"].transform(...)
# Collect data for export
collector.collect(
field1=nested_item["field1"],
field2=item["field2"],
generated_id=cocoindex.GeneratedField.UUID
)
# 4. Export to target
collector.export(
"target_name",
cocoindex.targets.TargetType(...),
primary_key_fields=["field1"],
vector_indexes=[...] # If needed
)
```
**Key principles:**
- Each source creates a field in the top-level data scope
- Use `.row()` to iterate through table data
- **CRITICAL: Always assign transformed data to row fields** - Use `item["new_field"] = item["existing_field"].transform(...)`, NOT local variables like `new_field = item["existing_field"].transform(...)`
- Transformations create new fields without mutating existing data
- Collectors gather data from any scope level
- Export must happen at top level (not within row iterations)
**Common mistakes to avoid:**
❌ **Wrong:** Using local variables for transformations
```python
with data_scope["files"].row() as file:
summary = file["content"].transform(...) # ❌ Local variable
summaries_collector.collect(filename=file["filename"], summary=summary)
```
✅ **Correct:** Assigning to row fields
```python
with data_scope["files"].row() as file:
file["summary"] = file["content"].transform(...) # ✅ Field assignment
summaries_collector.collect(filename=file["filename"], summary=file["summary"])
```
❌ **Wrong:** Creating unnecessary dataclasses to mirror flow fields
```python
from dataclasses import dataclass
@dataclass
class FileSummary: # ❌ Unnecessary - CocoIndex manages fields automatically
filename: str
summary: str
embedding: list[float]
# This dataclass is never used in the flow!
```
### Step 5: Design the Flow Solution
**IMPORTANT:** The patterns listed below are common starting points, but **you cannot exhaustively enumerate all possible scenarios**. When user requirements don't match existing patterns:
1. **Combine elements from multiple patterns** - Mix and match sources, transformations, and targets creatively
2. **Review additional examples** - See for diverse real-world use cases (face recognition, multimodal search, product recommendations, patient form extraction, etc.)
3. **Think from first principles** - Use the core APIs (sources, transforms, collectors, exports) and apply common sense to solve novel problems
4. **Be creative** - CocoIndex is flexible; unique combinations of components can solve unique problems
**Common starting patterns (use references for detailed examples):**
**For text embedding:** Load `references/flow_patterns.md` and refer to "Pattern 1: Simple Text Embedding"
**For code embedding:** Load `references/flow_patterns.md` and refer to "Pattern 2: Code Embedding with Language Detection"
**For LLM extraction + knowledge graph:** Load `references/flow_patterns.md` and refer to "Pattern 3: LLM-based Extraction to Knowledge Graph"
**For live updates:** Load `references/flow_patterns.md` and refer to "Pattern 4: Live Updates with Refresh Interval"
**For custom functions:** Load `references/flow_patterns.md` and refer to "Pattern 5: Custom Transform Function"
**For reusable query logic:** Load `references/flow_patterns.md` and refer to "Pattern 6: Transform Flow for Reusable Logic"
**For concurrency control:** Load `references/flow_patterns.md` and refer to "Pattern 7: Concurrency Control"
**Example of pattern composition:**
If a user asks to "index images from S3, generate captions with a vision API, and store in Qdrant", combine:
- AmazonS3 source (from S3 examples)
- Custom function for vision API calls (from custom functions pattern)
- EmbedText to embed the captions (from embedding patterns)
- Qdrant target (from target examples)
No single pattern covers this exact scenario, but the building blocks are composable.
### Step 6: Test and Run
Guide user through testing:
```bash
# 1. Run with setup
cocoindex update --setup -f main # -f force setup without confirmation prompts
# 2. Start a server and redirect users to CocoInsight
cocoindex server -ci main
# Then open CocoInsight at https://cocoindex.io/cocoinsight
```
## Data Types
CocoIndex has a type system independent of programming languages. All data types are determined at flow definition time, making schemas clear and predictable.
**IMPORTANT: When to define types:**
- **Custom functions**: Type annotations are **required** for return values (these are the source of truth for type inference)
- **Flow fields**: Type annotations are **NOT needed** - CocoIndex automatically infers types from sources, functions, and transformations
- **Dataclasses/Pydantic models**: Only create them when they're **actually used** (as function parameters/returns or ExtractByLlm output_type), NOT to mirror flow field schemas
**Type annotation requirements:**
- **Return values of custom functions**: Must use **specific type annotations** - these are the source of truth for type inference
- **Arguments of custom functions**: Relaxed - can use `Any`, `dict[str, Any]`, or omit annotations; engine already knows the types
- **Flow definitions**: No explicit type annotations needed - CocoIndex automatically infers types from sources and functions
**Why specific return types matter:** Custom function return types let CocoIndex infer field types throughout the flow without processing real data. This enables creating proper target schemas (e.g., vector indexes with fixed dimensions).
**Common type categories:**
1. **Primitive types**: `str`, `int`, `float`, `bool`, `bytes`, `datetime.date`, `datetime.datetime`, `uuid.UUID`
2. **Vector types** (embeddings): Specify dimension in return type if you plan to export as vectors to targets, as most targets require a fixed vector dimension
- `cocoindex.Vector[cocoindex.Float32, typing.Literal[768]]` - 768-dim float32 vector (recommended)
- `list[float]` without dimension also works
3. **Struct types**: Dataclass, NamedTuple, or Pydantic model
- Return type: Must use specific class (e.g., `Person`)
- Argument: Can use `dict[str, Any]` or `Any`
4. **Table types**:
- **KTable** (keyed): `dict[K, V]` where K = key type (primitive or frozen struct), V = Struct type
- **LTable** (ordered): `list[R]` where R = Struct type
- Arguments: Can use `dict[Any, Any]` or `list[Any]`
5. **Json type**: `cocoindex.Json` for unstructured/dynamic data
6. **Optional types**: `T | None` for nullable values
**Examples:**
```python
from dataclasses import dataclass
from typing import Literal
import cocoindex
@dataclass
class Person:
name: str
age: int
# ✅ Vector with dimension (recommended for vector search)
@cocoindex.op.function(behavior_version=1)
def embed_text(text: str) -> cocoindex.Vector[cocoindex.Float32, Literal[768]]:
"""Generate 768-dim embedding - dimension needed for vector index."""
# ... embedding logic ...
return embedding # numpy array or list of 768 floats
# ✅ Struct return type, relaxed argument
@cocoindex.op.function(behavior_version=1)
def process_person(person: dict[str, Any]) -> Person:
"""Argument can be dict[str, Any], return must be specific Struct."""
return Person(name=person["name"], age=person["age"])
# ✅ LTable return type
@cocoindex.op.function(behavior_version=1)
def filter_people(people: list[Any]) -> list[Person]:
"""Return type specifies list of specific Struct."""
return [p for p in people if p.age >= 18]
# ❌ Wrong: dict[str, str] is not a valid specific CocoIndex type
# @cocoindex.op.function(...)
# def bad_example(person: Person) -> dict[str, str]:
# return {"name": person.name}
```
**For comprehensive data types documentation:**
## Custom Functions
When users need custom transformation logic, create custom functions.
### Decision: Standalone vs Spec+Executor
**Use standalone function when:**
- Simple transformation
- No configuration needed
- No setup/initialization required
**Use spec+executor when:**
- Needs configuration (model names, API endpoints, parameters)
- Requires setup (loading models, establishing connections)
- Complex multi-step processing
### Creating Standalone Functions
```python
@cocoindex.op.function(behavior_version=1)
def my_function(input_arg: str, optional_arg: int | None = None) -> dict:
"""
Function description.
Args:
input_arg: Description
optional_arg: Optional description
"""
# Transformation logic
return {"result": f"processed-{input_arg}"}
```
**Requirements:**
- Decorator: `@cocoindex.op.function()`
- Type annotations on all arguments and return value
- Optional parameters: `cache=True` for expensive ops, `behavior_version` (required with cache)
### Creating Spec+Executor Functions
```python
# 1. Define configuration spec
class MyFunction(cocoindex.op.FunctionSpec):
"""Configuration for MyFunction."""
model_name: str
threshold: float = 0.5
# 2. Define executor
@cocoindex.op.executor_class(cache=True, behavior_version=1)
class MyFunctionExecutor:
spec: MyFunction # Required: link to spec
model = None # Instance variables for state
def prepare(self) -> None:
"""Optional: run once before execution."""
# Load model, setup connections, etc.
self.model = load_model(self.spec.model_name)
def __call__(self, text: str) -> dict:
"""Required: execute for each data row."""
# Use self.spec for configuration
# Use self.model for loaded resources
result = self.model.process(text)
return {"result": result}
```
**When to enable cache:**
- LLM API calls
- Model inference
- External API calls
- Computationally expensive operations
**Important:** Increment `behavior_version` when function logic changes to invalidate cache.
For detailed examples and patterns, load `references/custom_functions.md`.
**For more on custom functions:**
## Operating Flows
### CLI Operations
**Setup flow (create resources):**
```bash
cocoindex setup main
```
**One-time update:**
```bash
cocoindex update main
# With auto-setup
cocoindex update --setup main
# Force reset everything before setup and update
cocoindex update --reset main
```
**Live update (continuous monitoring):**
```bash
cocoindex update main.py -L
# Requires refresh_interval on source or source-specific change capture
```
**Drop flow (remove all resources):**
```bash
cocoindex drop main.py
```
**Inspect flow:**
```bash
cocoindex show main.py:FlowName
```
**Test without side effects:**
```bash
cocoindex evaluate main.py:FlowName --output-dir ./test_output
```
For complete CLI reference, load `references/cli_operations.md`.
**For CLI documentation:**
### API Operations
**Basic setup:**
```python
from dotenv import load_dotenv
import cocoindex
load_dotenv()
cocoindex.init()
@cocoindex.flow_def(name="MyFlow")
def my_flow(flow_builder, data_scope):
# ... flow definition ...
pass
```
**One-time update:**
```python
stats = my_flow.update()
print(f"Processed {stats.total_rows} rows")
# Async
stats = await my_flow.update_async()
```
**Live update:**
```python
# As context manager
with cocoindex.FlowLiveUpdater(my_flow) as updater:
# Updater runs in background
# Your application logic here
pass
# Manual control
updater = cocoindex.FlowLiveUpdater(
my_flow,
cocoindex.FlowLiveUpdaterOptions(
live_mode=True,
print_stats=True
)
)
updater.start()
# ... application logic ...
updater.wait()
```
**Setup/drop:**
```python
my_flow.setup(report_to_stdout=True)
my_flow.drop(report_to_stdout=True)
cocoindex.setup_all_flows()
cocoindex.drop_all_flows()
```
**Query with transform flows:**
```python
@cocoindex.transform_flow()
def text_to_embedding(text: cocoindex.DataSlice[str]) -> cocoindex.DataSlice[list[float]]:
return text.transform(
cocoindex.functions.SentenceTransformerEmbed(model="...")
)
# Use in flow for indexing
doc["embedding"] = text_to_embedding(doc["content"])
# Use for querying
query_embedding = text_to_embedding.eval("search query")
```
For complete API reference and patterns, load `references/api_operations.md`.
**For API documentation:**
## Built-in Functions
### Text Processing
**SplitRecursively** - Chunk text intelligently
```python
doc["chunks"] = doc["content"].transform(
cocoindex.functions.SplitRecursively(),
language="markdown", # or "python", "javascript", etc.
chunk_size=2000,
chunk_overlap=500
)
```
**ParseJson** - Parse JSON strings
```python
data = json_string.transform(cocoindex.functions.ParseJson())
```
**DetectProgrammingLanguage** - Detect language from filename
```python
file["language"] = file["filename"].transform(
cocoindex.functions.DetectProgrammingLanguage()
)
```
### Embeddings
**SentenceTransformerEmbed** - Local embedding model
```python
# Requires: cocoindex[embeddings]
chunk["embedding"] = chunk["text"].transform(
cocoindex.functions.SentenceTransformerEmbed(
model="sentence-transformers/all-MiniLM-L6-v2"
)
)
```
**EmbedText** - LLM API embeddings
This is the **recommended way** to generate embeddings using LLM APIs (OpenAI, Voyage, etc.).
```python
chunk["embedding"] = chunk["text"].transform(
cocoindex.functions.EmbedText(
api_type=cocoindex.LlmApiType.OPENAI,
model="text-embedding-3-small",
)
)
```
**ColPaliEmbedImage** - Multimodal image embeddings
```python
# Requires: cocoindex[colpali]
image["embedding"] = image["img_bytes"].transform(
cocoindex.functions.ColPaliEmbedImage(model="vidore/colpali-v1.2")
)
```
### LLM Extraction
**ExtractByLlm** - Extract structured data with LLM
This is the **recommended way** to use LLMs for extraction and summarization tasks. It supports both structured outputs (dataclasses, Pydantic models) and simple text outputs (str).
```python
import dataclasses
# For structured extraction
@dataclasses.dataclass
class ProductInfo:
name: str
price: float
category: str
item["product_info"] = item["text"].transform(
cocoindex.functions.ExtractByLlm(
llm_spec=cocoindex.LlmSpec(
api_type=cocoindex.LlmApiType.OPENAI,
model="gpt-4o-mini"
),
output_type=ProductInfo,
instruction="Extract product information"
)
)
# For text summarization/generation
file["summary"] = file["content"].transform(
cocoindex.functions.ExtractByLlm(
llm_spec=cocoindex.LlmSpec(
api_type=cocoindex.LlmApiType.OPENAI,
model="gpt-4o-mini"
),
output_type=str,
instruction="Summarize this document in one paragraph"
)
)
```
## Common Sources and Targets
**Browse all sources:**
**Browse all targets:**
### Sources
**LocalFile:**
```python
cocoindex.sources.LocalFile(
path="documents",
included_patterns=["*.md", "*.txt"],
excluded_patterns=["**/.*", "node_modules"]
)
```
**AmazonS3:**
```python
cocoindex.sources.AmazonS3(
bucket="my-bucket",
prefix="documents/",
aws_access_key_id=cocoindex.add_transient_auth_entry("..."),
aws_secret_access_key=cocoindex.add_transient_auth_entry("...")
)
```
**Postgres:**
```python
cocoindex.sources.Postgres(
connection=cocoindex.add_auth_entry("conn", cocoindex.sources.PostgresConnection(...)),
query="SELECT id, content FROM documents"
)
```
### Targets
**Postgres (with vector support):**
```python
collector.export(
"target_name",
cocoindex.targets.Postgres(),
primary_key_fields=["id"],
vector_indexes=[
cocoindex.VectorIndexDef(
field_name="embedding",
metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY
)
]
)
```
**Qdrant:**
```python
collector.export(
"target_name",
cocoindex.targets.Qdrant(collection_name="my_collection"),
primary_key_fields=["id"]
)
```
**LanceDB:**
```python
# Requires: cocoindex[lancedb]
collector.export(
"target_name",
cocoindex.targets.LanceDB(uri="lancedb_data", table_name="my_table"),
primary_key_fields=["id"]
)
```
**Neo4j (nodes):**
```python
collector.export(
"nodes",
cocoindex.targets.Neo4j(
connection=neo4j_conn,
mapping=cocoindex.targets.Nodes(label="Entity")
),
primary_key_fields=["id"]
)
```
**Neo4j (relationships):**
```python
collector.export(
"relationships",
cocoindex.targets.Neo4j(
connection=neo4j_conn,
mapping=cocoindex.targets.Relationships(
rel_type="RELATES_TO",
source=cocoindex.targets.NodeFromFields(
label="Entity",
fields=[cocoindex.targets.TargetFieldMapping(source="source_id", target="id")]
),
target=cocoindex.targets.NodeFromFields(
label="Entity",
fields=[cocoindex.targets.TargetFieldMapping(source="target_id", target="id")]
)
)
),
primary_key_fields=["id"]
)
```
## Common Issues and Solutions
### "Flow not found"
- Check APP_TARGET format: `cocoindex show main.py`
- Use `--app-dir` if not in project root
- Verify flow name matches decorator
### "Database connection failed"
- Check `.env` has `COCOINDEX_DATABASE_URL`
- Test connection: `psql $COCOINDEX_DATABASE_URL`
- Use `--env-file` to specify custom location
### "Schema mismatch"
- Re-run setup: `cocoindex setup main.py`
- Drop and recreate: `cocoindex drop main.py && cocoindex setup main.py`
### "Live update exits immediately"
- Add `refresh_interval` to source
- Or use source-specific change capture (Postgres notifications, S3 events)
### "Out of memory"
- Add concurrency limits on sources: `max_inflight_rows`, `max_inflight_bytes`
- Set global limits in `.env`: `COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS`
## Reference Documentation
This skill includes comprehensive reference documentation for common patterns and operations:
- **references/flow_patterns.md** - Complete examples of common flow patterns (text embedding, code embedding, knowledge graphs, live updates, concurrency control, etc.)
- **references/custom_functions.md** - Detailed guide for creating custom functions with examples (standalone functions, spec+executor pattern, LLM calls, external APIs, caching)
- **references/cli_operations.md** - Complete CLI reference with all commands, options, and workflows
- **references/api_operations.md** - Python API reference with examples for programmatic flow control, live updates, queries, and application integration patterns
Load these references when users need:
- Detailed examples of specific patterns
- Complete API documentation
- Advanced usage scenarios
- Troubleshooting guidance
**For comprehensive documentation:**
**Search specific topics:**
Source: claude-code-templates (MIT). See About Us for full credits.