Selasa, 18 Maret 2025

I Built an Open-Source ETL to Prepare Data for RAG

| Selasa, 18 Maret 2025

\ I’ve built an open source ETL framework (CocoIndex) to prepare data for RAG with my friend. CocoIndex is an ETL framework to preapare data for AI applications such as semantic search, retrieval-augmented generation (RAG). It offers a data-driven programming model that simplifies the creation and maintenance of data indexing pipelines, ensuring data freshness and consistency.

\

🔥 Key Features:

  • Data flow programming
  • Support custom logic - you can plugin your own choice of chunking, embedding, vector stores; plugin your own logic like lego. We have three examples in the repo for now. In the long run, we also want to support dedupe, reconcile etc.
  • Incremental updates. We provide state management out-of-box to minimize re-computation. Right now, it checks if a file from a data source is updated. In future, it will be at smaller granularity, e.g., at chunk level.
  • Python SDK (RUST core 🦀 with Python binding 🐍)

\ 🔗 GitHub Repo: CocoIndex

\ I also created a video tutorial with a quick starter guide Quickstart Guide.

\

🚀 Getting Started

  1. Installation: Install the CocoIndex Python library:
   pip install cocoindex
  1. Set Up Postgres with pgvector Extension: Ensure Docker Compose is installed, then start a Postgres database:
   docker compose -f <(curl -L https://raw.githubusercontent.com/cocoindex-io/cocoindex/refs/heads/main/dev/postgres.yaml) up -d
  1. Define Your Indexing Flow: Create a flow to index your data. For example:
   @cocoindex.flow_def(name="TextEmbedding")
   def text_embedding(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
       data_scope["documents"] = flow_builder.add_source(cocoindex.sources.LocalFile(path="markdown_files"))
       doc_embeddings = data_scope.add_collector()

       with data_scope["documents"].row() as doc:
           doc["chunks"] = doc["content"].transform(
               cocoindex.functions.SplitRecursively(language="markdown", chunk_size=300, chunk_overlap=100))

           with doc["chunks"].row() as chunk:
               chunk["embedding"] = chunk["text"].transform(
                   cocoindex.functions.SentenceTransformerEmbed(model="sentence-transformers/all-MiniLM-L6-v2"))

               doc_embeddings.collect(filename=doc["filename"], location=chunk["location"],
                                      text=chunk["text"], embedding=chunk["embedding"])

       doc_embeddings.export(
           "doc_embeddings",
           cocoindex.storages.Postgres(),
           primary_key_fields=["filename", "location"],
           vector_index=[("embedding", cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY)])

\ Sincerely looking for feedback and learning from your thoughts. Thank you so much!


Related Posts

Tidak ada komentar:

Posting Komentar