Ingest CSV Files into Elasticsearch Using Logstash

Nauval ShidqiNauval Shidqi
4 min read

Logstash is a versatile data pipeline tool that simplifies the ingestion of structured data into Elasticsearch. In this tutorial, we'll demonstrate how to load a CSV file into Elasticsearch using Logstash. We'll use an example CSV file and provide a detailed explanation of the logstash.conf file.


1. Prerequisites

Ensure you have the following before starting:

  • Logstash installed on your machine. You can download it from the Elastic website. In this tutorial using Logstash 8.16.2.

  • An Elasticsearch instance running. For testing, use a local setup or a cloud-based Elasticsearch service.

  • A sample CSV file (we’ll provide an example below).


2. Sample CSV File

Here’s a sample CSV file named customers.csv:

"customerNo","customerTitle","customerName","isActive","address","mobilePhone","officePhone","email","city","subdistrict","district","postalCode","province"
"SF32712899","Mrs","Customer 1698","False","Address 98, City Area 10","08012300001","0215533244","customer512@example.com","SLEMAN","Kelurahan 20","Kecamatan 12","31061","Jawa Barat"
"SF28882137","Ms","Customer 8645","True","Address 37, City Area 6","08012300002","0213941646","customer942@example.com","YOGYAKARTA","Kelurahan 17","Kecamatan 2","70013","Jawa Timur"
"SF15142460","Mr","Customer 7774","False","Address 15, City Area 9","08012300003","0214523872","customer289@example.com","YOGYAKARTA","Kelurahan 18","Kecamatan 9","67505","DKI Jakarta"
"SF15941666","Mrs","Customer 2768","True","Address 87, City Area 7","08012300004","0213445510","customer575@example.com","YOGYAKARTA","Kelurahan 18","Kecamatan 20","83681","Jawa Barat"
"SF22876437","Mrs","Customer 2447","False","Address 58, City Area 7","08012300005","0215257399","customer722@example.com","SLEMAN","Kelurahan 2","Kecamatan 12","14172","Jawa Timur"

Save this file to your desired location, for example:
/home/ubuntu/documents/esearch/customers.csv

💡 Before running Logstash, ensure the file has the appropriate permissions to avoid access issues. You can grant full permissions with the following command:

chmod 755 /home/ubuntu/documents/esearch/customers.csv

3. Logstash Configuration

Here’s the logstash.conf file that you will use to process the CSV file and load it into Elasticsearch. Each part of the configuration is explained within the combined block:

input {
  file {
    # Specify the path to the CSV file
    path => "/home/ubuntu/documents/esearch/customers.csv"
    # Read the file from the beginning
    start_position => 'beginning'
    # Disable sincedb to prevent file state persistence (useful for testing)
    sincedb_path => "/dev/null"    
    # Set file encoding to UTF-8
    codec => plain { charset => 'UTF-8' }
  }
}

filter {
  csv {
    # Map the CSV columns to Logstash fields
    columns => [
      "customerNo","customerTitle","customerName","isActive","address","mobilePhone","officePhone","email","city","subdistrict","district","postalCode","province"
    ]
    # Define the CSV separator and quote character
    separator => ','
    quote_char => '"'
    # Skip the header row in the CSV file
    skip_header => true
  }
  mutate {
    # Convert the `isFleet` field from string to boolean
    convert => {
      "[isActive]" => "boolean"
    }
  }
  mutate {
    # Remove unnecessary fields from the output
    remove_field => ['host', 'message', 'path']
  }
}

output {
  elasticsearch { 
    # Specify Elasticsearch host and authentication
    hosts => ['https://192.168.0.1:9200']
    user => 'yourusername'
    password => 'yourpassword'
    # api_key => 'no8n95MBXkXfnqgkW:XCPxxyvhSjaLvkI5aFA' # Auth using API Key
    # Define the target index for the data
    index => 'customer'
    # Disable SSL certificate verification
    ssl_verification_mode => 'none'
    # Use the `customerNo` field as the unique document ID
    document_id => '%{[customerNo]}'
    # Disable data streams for this output
    data_stream => false
  }
  # Print the output to the console for debugging
  stdout { codec => rubydebug }
}

Key Features:

  1. Input: Reads the CSV file, ensuring the file is processed from the beginning with UTF-8 encoding (without BOM).

  2. Filter: Parses the CSV content, maps the fields, converts data types, and cleans up unnecessary fields.

  3. Output: Sends the processed data to Elasticsearch, using customerNo as the document ID and printing the output to the console for verification.

💡 Example configuration for Windows OS

# Specify the path to the CSV file
path => 'C:/users/nauval/documents/logstash/customers.csv'
# Disable sincedb to prevent file state persistence (useful for testing)
sincedb_path => 'NUL'

💡 Example configuration for defining sincedb_path

# Specify the path to the CSV file
path => "/user/nauval/logstash/data_input/*.csv"
# Read the file from the beginning
start_position => 'beginning'
# File sincedb and its directory must be writable
sincedb_path => "/user/nauval/logstash/data_input/.sincedb_customer"

4. Running Logstash

  1. Save the logstash.conf file in the Logstash configuration directory.

  2. Open a terminal, navigate to the Logstash directory, and run:

     bin/logstash -f logstash.conf
    
  3. Logstash will process the CSV file and ingest the data into Elasticsearch.


5. Verifying the Data

  1. Open Elasticsearch or a tool like Kibana.

  2. Query the customer index to verify that the data has been successfully ingested.

Example Kibana query:

GET /customer/_search

Example response:


📤 Export Elasticsearch Documents to a CSV File Using Logstash

⏰ Add timezone in config/jvm.options

-Duser.timezone=Asia/Jakarta

⚙️ Logstash configuration logstash-export.conf

input {
  elasticsearch {
    hosts => ['https://192.168.0.101:9200']
    index => 'equipment'
    user => 'yourusername'
    password => 'yourpassword'
    ssl_verification_mode => 'none'
    query => '{ "query": { "match_all": {} } }'
  }
}

output {
  csv {
    path => "C:/users/nauvalsh/logstash/equipment-logstash-%{+YYYY-MM-dd_HH-mm-ss}.csv"
    fields => ["equipmentNo", "policeRegNo", "engineNo", "materialNo"]
    csv_options => {
      "col_sep" => ","
      "force_quotes" => true
      "headers" => true
    }
  }

  stdout {
    codec => rubydebug
  }
}

🚀 Execute logstash-export.conf file

bin/logstash -f logstash-export.conf

6. Conclusion

Logstash makes it easy to ingest structured CSV data into Elasticsearch. By defining a simple configuration file, you can parse, transform, and load your data with minimal effort. This workflow is highly customizable, allowing you to adapt it to your specific use case.

0
Subscribe to my newsletter

Read articles from Nauval Shidqi directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Nauval Shidqi
Nauval Shidqi