Back to blog

Elasticsearch Python API: A Hands-On Tutorial

Learn to use the Elasticsearch Python API with this hands-on guide. Covers connection, indexing, searching, bulk ingestion, and a real-world example.

Elasticsearch Python API: A Hands-On Tutorial

You already have the hard part done. Your OCR or document extraction system is producing structured JSON from invoices, delivery notes, KYC files, or claims paperwork. The problem starts right after that, when the data lands in files, object storage, or a relational table that isn't built for search.

That’s where the elasticsearch python api becomes useful. It gives Python teams a practical way to index document data, search it with real relevance logic, and run operational analytics without building a custom search layer from scratch.

Getting Started with the Elasticsearch Python API

If your pipeline ends with JSON, Elasticsearch gives that data a second life. Instead of treating extracted documents as records you only retrieve by ID, you can search across vendor names, filter by dates, aggregate totals by supplier, and inspect document content with full-text queries.

The official Python client is elasticsearch-py. It was first released in 2013 alongside Elasticsearch 1.0, and recent stable documentation covers version 9.3.0. That long lifecycle matters because it means the client isn’t a thin wrapper around a trend. It’s a mature interface for indexing, searching, and cluster operations through Elasticsearch’s REST APIs, with Python-friendly request handling and helpers for real workloads (official client API docs).

A professional workspace featuring a large monitor displaying data tables and a person working on a laptop.

A common pattern looks like this:

  1. A document pipeline extracts fields from PDFs or images.
  2. Python normalizes the output into consistent JSON.
  3. Elasticsearch stores those documents for search and analytics.
  4. An internal app, back-office tool, or customer-facing API queries the index.

For teams coming from file-based workflows, the jump is significant. You go from “find the right PDF” to “show all invoices from this vendor with late payment language and a total amount above a threshold.”

What the client is good at

The elasticsearch python api is strongest when you need direct control. You can:

  • Index documents predictably with explicit mappings
  • Run full-text and structured queries in the same request
  • Batch ingestion through helpers instead of looping one document at a time
  • Inspect cluster and index metrics when performance starts to drift

That last point matters more than people expect. Search quality often fails because data wasn’t shaped correctly before indexing, not because Elasticsearch is slow.

Search pipelines break at the mapping layer first. Query tuning comes later.

If your upstream system produces rich JSON from PDFs, it helps to understand the difference between extraction and parsing. This short guide on what data parsing means in document workflows is a useful mental model before you design your index.

A practical starting mindset

Treat Elasticsearch as part of your application schema, not as a dumping ground. The client makes it easy to push data fast, but the teams that get clean search results usually spend more time on field types, index design, and query structure than on the first connection snippet.

That’s the difference between a demo and a production search pipeline.

Installation and Handling Version Compatibility

Installing the library is easy. Getting the right library version is where many projects go sideways.

Start with the basic package:

pip install elasticsearch

That command alone isn't enough for production work. The first question should be: what Elasticsearch server version are you connecting to? The Python client tracks major server versions closely, and incompatibilities show up as confusing request errors.

Why version matching matters

This isn’t a niche edge case. Analysis of over 500 recent GitHub issues found that approximately 25% of reported problems came from version mismatches between the Python client and the Elasticsearch server, including breakage around the deprecation of doc_type in v8 (v7.13 client API reference).

That lines up with what shows up in real projects. A script written for a 7.x cluster often fails against 8.x or 9.x because parameter names, accepted call styles, and deprecated fields changed.

The compatibility checks that save time

Before writing any indexing code, check your cluster version:

from elasticsearch import Elasticsearch

es = Elasticsearch("http://localhost:9200")
info = es.info()
print(info["version"]["number"])

Once you know the server major version, align the client major version with it. If your cluster is on 8.x, use an 8.x client. If you’re targeting newer 9.x behavior, don’t assume old examples from 7.x tutorials will still work unchanged.

A few migration pain points show up repeatedly:

Version area What changes in practice What breaks
7.x to 8.x doc_type usage is deprecated or removed Old indexing and mapping examples fail
7.x syntax habits Some examples use older request patterns Mixed styles make debugging harder
8.x to 9.x usage Keyword-argument style becomes more important Positional or inconsistent calls become fragile

What to do in mixed environments

Many teams don’t control their Elasticsearch upgrade cycle. Finance, compliance, and ERP-connected systems often carry older clusters longer than greenfield apps. In that case, don’t copy a shiny tutorial written for the newest release and hope it works.

Use this checklist instead:

  • Confirm the server version first before choosing package version.
  • Pin the client in requirements so deployments don’t drift.
  • Review deprecated fields early, especially doc_type.
  • Test index creation and one search request first before building the rest of the pipeline.

Practical rule: If the first info(), indices.create(), and search() calls don’t work cleanly in a scratch script, stop there. Don’t build bulk ingestion on top of a broken version pairing.

A sane installation pattern

For teams that care about reproducibility, pin the major version explicitly:

pip install "elasticsearch==8.*"

Or, if your environment is already standardized on another major line, pin that instead. The important part isn’t the exact command. It’s that your application code, deployment environment, and server version move together.

The client has evolved through over 10 major versions by 2026, which is a strength, but it also means examples age quickly if you don’t anchor them to the version you run. Treat compatibility as a first-order design concern, not cleanup work.

Connecting and Authenticating to Your Cluster

Once the client version matches the cluster, the next job is connection hygiene. Most search problems in development look like “Elasticsearch is broken” when the actual issue is bad authentication, hardcoded secrets, or a connection object copied from an old blog post.

Elastic Cloud with Cloud ID and API key

For managed deployments, the cleanest setup is usually Cloud ID plus API key:

import os
from elasticsearch import Elasticsearch

es = Elasticsearch(
    cloud_id=os.getenv("ELASTIC_CLOUD_ID"),
    api_key=os.getenv("ELASTIC_API_KEY"),
)

This style keeps the connection string compact and avoids embedding host details throughout the codebase. It also fits modern secret management better than scattered username and password values.

Self-managed cluster with API key

If you run your own cluster, API keys are still the better default:

import os
from elasticsearch import Elasticsearch

es = Elasticsearch(
    hosts=[os.getenv("ELASTICSEARCH_URL")],
    api_key=os.getenv("ELASTICSEARCH_API_KEY"),
)

API key authentication support was added in Elasticsearch 8.x in 2022, and it’s standard in Python client 8.x and 9.x. That makes it the right choice for current enterprise integrations and zero-trust style access control.

Basic auth for local development or legacy systems

You’ll still see basic auth in internal tools and older deployments:

import os
from elasticsearch import Elasticsearch

es = Elasticsearch(
    hosts=[os.getenv("ELASTICSEARCH_URL", "http://localhost:9200")],
    basic_auth=(
        os.getenv("ELASTICSEARCH_USERNAME"),
        os.getenv("ELASTICSEARCH_PASSWORD"),
    ),
)

This is workable for local environments. It’s not the pattern to normalize for production if you have a choice.

The habits that prevent operational mess

A reliable connection block usually has three traits:

  • Secrets live in environment variables, not source files.
  • The client is created once and reused, rather than rebuilt inside every function.
  • Authentication style matches the deployment model, instead of forcing one pattern everywhere.

Here’s a simple check after initialization:

info = es.info()
print(info["cluster_name"])
print(info["version"]["number"])

If es.info() works but indexing fails, the issue usually isn’t connectivity. It’s permissions, mappings, or request shape.

Sync or async

The Python client supports both sync and async styles. For many document indexing pipelines, the synchronous client is easier to reason about and debug. Async can help when you’re integrating search into a larger event-driven system, but it also increases failure modes around retries, connection pooling, and recovery handling.

That trade-off matters more than fashion. If your ingestion path is already parallelized elsewhere, the sync client is often enough.

Indexing Data with Explicit Mappings

If you only remember one rule from this article, keep this one: don’t rely on dynamic mapping for production document data.

Elasticsearch will happily guess field types when you index the first document. That feels convenient for five minutes. Then an amount arrives as a string in one file, a date arrives in an unexpected format, or an array of line items gets flattened in a way that breaks filtering.

For complex JSON, define mappings with text for full-text search, keyword subfields for exact matches, date for timestamps, and nested for arrays like line items. Disabling unused metadata fields can reduce index size by up to 20-30% (practical mapping guide).

A diagram illustrating Elasticsearch data indexing strategies, comparing explicit mappings with dynamic mapping for optimal database performance.

A realistic document shape

A document extraction pipeline usually produces JSON closer to this than to a toy blog example:

invoice_doc = {
    "document_id": "inv_0001",
    "document_type": "invoice",
    "vendor_name": "Northwind Supplies",
    "vendor_tax_id": "EU123456",
    "invoice_number": "2026-INV-1042",
    "invoice_date": "2026-01-15",
    "due_date": "2026-02-14",
    "currency": "EUR",
    "total_amount": 1840.50,
    "status": "approved",
    "content": "Full extracted text from the PDF",
    "tags": ["finance", "ap", "invoice"],
    "line_items": [
        {
            "description": "Industrial gloves",
            "sku": "GLV-002",
            "quantity": 40,
            "unit_price": 12.50
        },
        {
            "description": "Safety glasses",
            "sku": "GLS-014",
            "quantity": 25,
            "unit_price": 18.00
        }
    ],
    "validation": {
        "source_file": "batch_17.pdf",
        "review_state": "auto_approved"
    }
}

That structure mixes search text, identifiers, dates, monetary values, and object arrays. Each of those needs a deliberate field type.

A mapping that works in practice

mappings = {
    "mappings": {
        "properties": {
            "document_id": {"type": "keyword"},
            "document_type": {"type": "keyword"},
            "vendor_name": {
                "type": "text",
                "fields": {
                    "keyword": {"type": "keyword"}
                }
            },
            "vendor_tax_id": {"type": "keyword"},
            "invoice_number": {"type": "keyword"},
            "invoice_date": {
                "type": "date",
                "format": "yyyy-MM-dd||epoch_millis"
            },
            "due_date": {
                "type": "date",
                "format": "yyyy-MM-dd||epoch_millis"
            },
            "currency": {"type": "keyword"},
            "total_amount": {"type": "float"},
            "status": {"type": "keyword"},
            "content": {"type": "text"},
            "tags": {"type": "keyword"},
            "line_items": {
                "type": "nested",
                "properties": {
                    "description": {"type": "text"},
                    "sku": {"type": "keyword"},
                    "quantity": {"type": "float"},
                    "unit_price": {"type": "float"}
                }
            },
            "validation": {
                "properties": {
                    "source_file": {"type": "keyword"},
                    "review_state": {"type": "keyword"}
                }
            },
            "metadata": {
                "type": "object",
                "enabled": False
            }
        }
    }
}

Why each field type matters

Use this as a decision guide:

  • text fields are for analyzed search. vendor_name and content belong here.
  • keyword fields are for filters, exact matches, sorting, and aggregations.
  • date fields prevent date strings from turning into brittle text comparisons.
  • float fields allow amount and quantity range filters.
  • nested fields preserve relationships inside arrays of objects.

If you skip nested for line items, queries can mix values across different items inside the same invoice. That leads to false matches and very hard-to-explain search behavior.

A search result that looks “almost right” is often worse than an obvious error. It survives longer in production.

Creating the index

from elasticsearch import Elasticsearch

es = Elasticsearch(cloud_id="your_cloud_id", api_key="your_key")

es.indices.create(
    index="documents_invoices",
    **mappings
)

For teams extracting PDFs into structured JSON, this stage is tightly connected to the parser itself. The cleaner your upstream schema is, the easier your mapping becomes. If you’re still working through that part, this guide on parsing PDFs in Python is a good upstream companion.

Dynamic mapping versus explicit mapping

Here’s the trade-off in plain terms:

Approach When it feels good What goes wrong
Dynamic mapping Fast for quick experiments Inconsistent field types and brittle queries
Explicit mapping Slightly more setup work Stable filters, better relevance, fewer surprises

Explicit mapping also makes reindexing and debugging much easier. When an index behaves badly, you can inspect a known schema instead of reverse-engineering what Elasticsearch inferred from the first few documents.

Advanced Searching and Running Aggregations

Once the mapping is solid, search becomes predictable. That’s where Elasticsearch starts paying back the setup cost.

Most business queries aren’t pure full-text search. They combine content relevance with structured filters. “Find invoices mentioning customs fees, from this quarter, above a certain amount, and only from approved suppliers” is a typical example. The Query DSL handles that well when you separate what must match from what should only filter.

Start with a useful bool query

query = {
    "query": {
        "bool": {
            "must": [
                {"match": {"content": "customs fees"}}
            ],
            "filter": [
                {"term": {"document_type": "invoice"}},
                {"term": {"status": "approved"}},
                {"range": {"invoice_date": {"gte": "2026-01-01", "lte": "2026-03-31"}}},
                {"range": {"total_amount": {"gte": 500}}}
            ],
            "should": [
                {"match": {"vendor_name": "northwind"}}
            ]
        }
    },
    "sort": [
        {"invoice_date": {"order": "desc"}}
    ]
}

response = es.search(index="documents_invoices", body=query)

This structure keeps relevance and filtering separate:

  • must affects matching and score
  • filter enforces constraints without scoring overhead
  • should boosts useful matches without making them mandatory

That pattern is usually better than stuffing every condition into must.

Querying nested line items correctly

If you need to find invoices containing a specific SKU or line-item description, query the nested field directly:

nested_query = {
    "query": {
        "nested": {
            "path": "line_items",
            "query": {
                "bool": {
                    "must": [
                        {"term": {"line_items.sku": "GLV-002"}}
                    ]
                }
            }
        }
    }
}

response = es.search(index="documents_invoices", body=nested_query)

Without the nested query, Elasticsearch can match unrelated values across different objects inside the same array.

Aggregations for reporting and workflows

Search gets documents back. Aggregations tell you what the dataset is doing.

A terms aggregation on vendors:

agg_query = {
    "size": 0,
    "aggs": {
        "invoices_by_vendor": {
            "terms": {
                "field": "vendor_name.keyword"
            }
        }
    }
}

response = es.search(index="documents_invoices", body=agg_query)

A date histogram for document volume over time:

timeline_query = {
    "size": 0,
    "aggs": {
        "volume_over_time": {
            "date_histogram": {
                "field": "invoice_date",
                "calendar_interval": "month"
            }
        }
    }
}

response = es.search(index="documents_invoices", body=timeline_query)

These are useful for dashboards, exception queues, and operational monitoring. They’re also a fast way to validate that your ingest pipeline is shaping data the way you expect.

If an aggregation on a field looks wrong, check the mapping before touching the query.

Deep pagination without hurting the cluster

For result sets beyond 10,000 hits, use search_after with a stable sort rather than relying on the deprecated Scroll API. On large log sets above 100 million documents, this method showed 98-99% completion, compared with 60% failure rates for Scroll at scale (deep pagination write-up).

A practical pattern looks like this:

query = {
    "query": {
        "term": {"document_type": "invoice"}
    },
    "sort": [
        {"invoice_date": "asc"},
        {"document_id": "asc"}
    ],
    "size": 1000
}

response = es.search(index="documents_invoices", body=query)

hits = response["hits"]["hits"]

while hits:
    last_sort = hits[-1]["sort"]
    query["search_after"] = last_sort
    response = es.search(index="documents_invoices", body=query)
    hits = response["hits"]["hits"]

The key detail is the stable composite sort. If you page extensively without deterministic sorting, retrieval gets unreliable fast.

Bulk Ingestion and Performance Scaling Tips

Your first load test usually fails in a predictable way. The mapping is fine, queries are fine, but the ingest worker is still calling index() once per document and spends more time on HTTP overhead than on actual indexing. That pattern shows up fast with AI-extracted document payloads, especially if you are pushing nested JSON from tools like Matil.ai document processing workflows instead of flat rows.

Bulk ingestion should be the default for any real pipeline. Elasticsearch handles batched writes far better than a stream of tiny requests, and the Python client gives you the right primitives to do it without building the whole payload in memory first.

A clean bulk pattern

from elasticsearch.helpers import bulk

def generate_actions(docs, index_name):
    for doc in docs:
        yield {
            "_index": index_name,
            "_id": doc["document_id"],
            "_source": doc
        }

bulk(es, generate_actions(invoice_docs, "documents_invoices"))

A generator keeps memory usage predictable. It also fits the way production ingest systems usually work, reading from object storage, queues, or ETL batches rather than assembling one massive Python list.

One warning from experience. Do not treat helpers.bulk() as fire-and-forget. Partial failures are common when one bad document slips into an otherwise valid batch.

The tuning choices that actually matter

Three settings affect throughput more than small code tweaks:

  • Chunk size controls request size and memory pressure. Start conservatively and test with your real document shape, not toy examples.
  • Refresh policy changes the trade-off between search visibility and write speed. Frequent refreshes make debugging easier and bulk loads slower.
  • Error handling determines whether a bad batch is recoverable or turns into silent data loss.

If your documents come from AI extraction, batch sizing gets trickier. A thousand short records and a thousand large nested documents are very different workloads, even if the document count matches. Size chunks based on payload volume and cluster behavior, not just number of docs.

Example: stream batches and inspect failures

from elasticsearch.helpers import streaming_bulk

def generate_actions(docs, index_name):
    for doc in docs:
        yield {
            "_index": index_name,
            "_id": doc["document_id"],
            "_source": doc
        }

for ok, result in streaming_bulk(
    es,
    generate_actions(invoice_docs, "documents_invoices"),
    chunk_size=500,
    max_retries=3,
):
    if not ok:
        print("Bulk item failed:", result)

streaming_bulk() is often the better choice in production because you can inspect failures as they happen instead of waiting for one large response object at the end. That matters when you are dealing with mapping mismatches, rejected writes, or version conflicts.

Patterns that cause trouble

Pattern What goes wrong Better option
Looping index() for every doc Excess request overhead and lower throughput helpers.bulk() or streaming_bulk()
Sending mixed document shapes into one index Mapping conflicts and rejected documents Normalize fields before indexing and use explicit mappings
Refreshing on every batch Lower ingest throughput Refresh less often during large loads
Ignoring per-item bulk errors Missing documents with no clear recovery path Log failed items and requeue them

Production advice for scaling past the tutorial stage

Keep document IDs deterministic when you can. That gives you idempotent retries and makes reruns much safer. Clean null-like values before indexing, especially if upstream systems switch between empty strings, missing keys, and nested empty objects. Be strict about date parsing before the payload reaches Elasticsearch. Version differences across clusters also matter here. A bulk script that behaves one way against a 7.x environment may need small but important adjustments against newer client and server combinations.

The boring pipeline usually wins. Batch writes, validate document shape before send, capture item-level failures, and tune chunk sizes with realistic payloads. That is how you keep an Elasticsearch ingest job stable after the data volume stops being small.

Full Example Ingesting Matil.ai Document Data

A good tutorial should end with something you can adapt directly. The script below creates an index, applies explicit mappings, bulk ingests document JSON, and runs a couple of useful searches.

A dual monitor setup displaying Python code for data processing alongside a visualization of document indexing.

On standard hardware, helpers.bulk() can achieve 10,000+ documents per second, representing a 10x improvement over looped single-document API calls for large unstructured datasets (Elasticsearch indices stats reference).

End-to-end example

import os
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

INDEX_NAME = "matil_documents"

def get_client():
    return Elasticsearch(
        cloud_id=os.getenv("ELASTIC_CLOUD_ID"),
        api_key=os.getenv("ELASTIC_API_KEY"),
    )

MAPPINGS = {
    "mappings": {
        "properties": {
            "document_id": {"type": "keyword"},
            "document_type": {"type": "keyword"},
            "vendor_name": {
                "type": "text",
                "fields": {
                    "keyword": {"type": "keyword"}
                }
            },
            "invoice_number": {"type": "keyword"},
            "invoice_date": {
                "type": "date",
                "format": "yyyy-MM-dd||epoch_millis"
            },
            "total_amount": {"type": "float"},
            "currency": {"type": "keyword"},
            "status": {"type": "keyword"},
            "content": {"type": "text"},
            "tags": {"type": "keyword"},
            "line_items": {
                "type": "nested",
                "properties": {
                    "description": {"type": "text"},
                    "sku": {"type": "keyword"},
                    "quantity": {"type": "float"},
                    "unit_price": {"type": "float"}
                }
            },
            "validation": {
                "properties": {
                    "review_state": {"type": "keyword"},
                    "source_file": {"type": "keyword"}
                }
            }
        }
    }
}

def ensure_index(es):
    if not es.indices.exists(index=INDEX_NAME):
        es.indices.create(index=INDEX_NAME, **MAPPINGS)

def generate_actions(documents):
    for doc in documents:
        yield {
            "_index": INDEX_NAME,
            "_id": doc["document_id"],
            "_source": doc
        }

def ingest_documents(es, documents):
    bulk(es, generate_actions(documents))

def search_vendor(es, vendor_name):
    body = {
        "query": {
            "bool": {
                "must": [
                    {"match": {"vendor_name": vendor_name}}
                ],
                "filter": [
                    {"term": {"document_type": "invoice"}}
                ]
            }
        }
    }
    return es.search(index=INDEX_NAME, body=body)

def aggregate_by_vendor(es):
    body = {
        "size": 0,
        "aggs": {
            "vendors": {
                "terms": {
                    "field": "vendor_name.keyword"
                }
            }
        }
    }
    return es.search(index=INDEX_NAME, body=body)

if __name__ == "__main__":
    es = get_client()
    ensure_index(es)

    sample_docs = [
        {
            "document_id": "inv_001",
            "document_type": "invoice",
            "vendor_name": "Northwind Supplies",
            "invoice_number": "INV-001",
            "invoice_date": "2026-01-12",
            "total_amount": 1840.50,
            "currency": "EUR",
            "status": "approved",
            "content": "Invoice for industrial gloves and safety glasses",
            "tags": ["invoice", "finance"],
            "line_items": [
                {
                    "description": "Industrial gloves",
                    "sku": "GLV-002",
                    "quantity": 40,
                    "unit_price": 12.50
                }
            ],
            "validation": {
                "review_state": "auto_approved",
                "source_file": "batch_17.pdf"
            }
        },
        {
            "document_id": "inv_002",
            "document_type": "invoice",
            "vendor_name": "Blue Harbor Logistics",
            "invoice_number": "INV-002",
            "invoice_date": "2026-01-18",
            "total_amount": 920.00,
            "currency": "EUR",
            "status": "reviewed",
            "content": "Freight and customs handling charges",
            "tags": ["invoice", "logistics"],
            "line_items": [
                {
                    "description": "Freight handling",
                    "sku": "FR-100",
                    "quantity": 1,
                    "unit_price": 920.00
                }
            ],
            "validation": {
                "review_state": "manual_review",
                "source_file": "batch_18.pdf"
            }
        }
    ]

    ingest_documents(es, sample_docs)

    vendor_results = search_vendor(es, "Northwind")
    print(vendor_results["hits"]["hits"])

    agg_results = aggregate_by_vendor(es)
    print(agg_results["aggregations"]["vendors"]["buckets"])

Why this pattern holds up

This script does a few things right:

  • It creates the index before ingestion instead of trusting dynamic mapping.
  • It uses deterministic document IDs so reprocessing can be controlled.
  • It separates indexing and querying logic into small functions.
  • It uses bulk ingestion instead of single-document loops.

If you need to turn extracted PDFs and mixed business documents into a searchable operational layer, that’s the shape to aim for.

A short product overview can help if you’re evaluating the upstream side of that pipeline. See Matil’s document automation platform for an example of how structured JSON is produced before Elasticsearch becomes the search layer.

A quick walkthrough video helps if you want to compare your implementation style against a live example:

Final advice before you ship

Don’t judge the elasticsearch python api by how fast you can get a demo running. Judge it by how stable your mappings are, how predictable your queries stay after schema drift, and how calmly your ingestion pipeline behaves under load.

The teams that get the most from Elasticsearch usually keep the design simple. They model documents carefully, batch writes, avoid magical defaults, and treat search relevance as part of the data contract.


If you're evaluating how to automate document extraction before indexing, you can explore Matil. It combines OCR, classification, validation, and workflow automation in a single API, supports pre-trained and customizable models, delivers above 99% accuracy in multiple use cases, and is built for enterprise environments with GDPR, ISO 27001, SOC-aligned security, zero data retention, and an SLA of over 99.99% availability.

Related articles

© 2026 Matil