Ingest pipelines

littlenoteslittlenotes
3 min read

Ingest pipelines trong Elasticsearch cho phép chúng ta thực hiện một số thay đổi trên data trước khi đưa vào từng index. Chúng ta có thể thêm hay xóa field, extract data từ text field,…
Một pipeline bao gồm các task được gọi là processor. Mỗi processor sẽ thực hiện thay đổi cụ thể trên data đầu vào. Sau khi processor xử lý xong thì Elasticsearch sẽ đưa các data này vào trong từng index

Ingest pipeline diagram

Ta sẽ làm một ví dụ về việc sử dụng ingest pipeline.
Giả sử ta có các sản phẩm products có format như sau:

{
  "id": 637372,
  "name": "Ronald Martin",
  "category": "clothing",
  "price": 319.47,
  "stock": 60,
  "created_at": "2024-05-04 14:32:40",
  "description": """Change black effort author point this. Hope family both number month TV ability market. Image day key current father land address certain.
Student develop language vote general control view.""",
  "tags": [
    "raise",
    "easy",
    "yeah"
  ],
  "rating": 3.6,
  "is_available": true
}

Tuy nhiên, hệ thống của tôi không cần điểm rating chi tiết mà chỉ muốn biết là sản phẩm nào bị chê, nên tôi muốn xóa field rating đi và thêm vào field complaint

Khi đó index mapping các fields của tôi có dạng

PUT /products
{
  "mappings": {
    "properties": {
      "id": {
        "type": "integer"
      },
      "name": {
        "type": "text"
      },
      "category": {
        "type": "keyword"
      },
      "price": {
        "type": "float"
      },
      "stock": {
        "type": "integer"
      },
      "created_at": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss"
      },
      "description": {
        "type": "text"
      },
      "tags": {
        "type": "keyword"
      },
      "complaint": {
        "type": "boolean"
      },
      "is_available": {
        "type": "boolean"
      }
    }
  }
}

Và tôi sẽ tạo một pipeline để xử lý biến đổi từ field rating của data đầu vào sang thành field complaint trong index products của Elasticsearch. Lúc này chúng ta sẽ sử dụng API ingest để tạo pipeline như sau:

PUT _ingest/pipeline/products-pipeline
{
  "description": "Complaint detect",
  "processors": [
    {
      "script": {
        "lang": "painless",
        "source": "if(ctx.containsKey('rating') && ctx.rating != null && ctx.rating < 3) { ctx.complaint = true; } else { ctx.complaint = false; }"
      }
    },
    {
      "remove": {
        "field": "rating",
        "ignore_missing": true
      }
    }
  ]
}

Mỗi khi insert hoặc update data, ta truyền parameter ?pipeline=products-pipeline để áp dụng pipeline này

PUT products/_bulk?pipeline=products-pipeline
{"create":{}}
{"id":637372,"name":"Ronald Martin","category":"clothing","price":319.47,"stock":60,"created_at":"2024-05-04 14:32:40","description":"Change black effort author point this. Hope family both number month TV ability market. Image day key current father land address certain. Student develop language vote general control view.","tags":["raise","easy","yeah"],"rating":3.6,"is_available":true}

Lúc này trong index field complaint được insert vào chứ không phải field rating từ raw data.

GET /products/_search
{
  "query": {
    "term": {
      "id": {
        "value": "637372"
      }
    }
  }
}
--------------------------------------------------------------------------------------------------
{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "ps",
        "_id": "SOY05pUB26wiOSF8AjKr",
        "_score": 1,
        "_source": {
          "complaint": false,
          "price": 319.47,
          "name": "Ronald Martin",
          "created_at": "2024-05-04 14:32:40",
          "description": "Change black effort author point this. Hope family both number month TV ability market. Image day key current father land address certain. Student develop language vote general control view.",
          "id": 637372,
          "category": "clothing",
          "stock": 60,
          "tags": [
            "raise",
            "easy",
            "yeah"
          ],
          "is_available": true
        }
      }
    ]
  }
}

Tham khảo: https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html

0
Subscribe to my newsletter

Read articles from littlenotes directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

littlenotes
littlenotes