Visual Data Flow 11. Apache NiFi .

user1272047user1272047
3 min read

Here are three examples of Apache NiFi code, along with qualitative and technical explanations for each:


Example 1: Ingesting Data from a CSV File

Qualitative Explanation:

  1. Purpose: Automate the ingestion of data from a CSV file into a database.

  2. Use Case: ETL (Extract, Transform, Load) workflows for data integration.

  3. Outcome: Data from the CSV file is processed and stored in a database.

Technical Explanation:

  1. Create Processor: Use GetFile to read the CSV file.

     <processor class="org.apache.nifi.processors.standard.GetFile">
         <property name="Input Directory">/path/to/csv</property>
     </processor>
    
  2. Transform Data: Use ConvertRecord to parse CSV into JSON.

     <processor class="org.apache.nifi.processors.standard.ConvertRecord">
         <property name="Record Reader">CSVReader</property>
         <property name="Record Writer">JSONWriter</property>
     </processor>
    
  3. Load Data: Use PutDatabaseRecord to insert data into a database.

     <processor class="org.apache.nifi.processors.standard.PutDatabaseRecord">
         <property name="Database URL">jdbc:mysql://localhost:3306/mydb</property>
         <property name="Table Name">mytable</property>
     </processor>
    
  4. Error Handling: Use LogAttribute to log errors.

     <processor class="org.apache.nifi.processors.standard.LogAttribute">
         <property name="Log Level">ERROR</property>
     </processor>
    

Example 2: Real-Time Data Streaming with Kafka

Qualitative Explanation:

  1. Purpose: Stream real-time data from Kafka topics for processing.

  2. Use Case: Real-time analytics and monitoring of streaming data.

  3. Outcome: Data from Kafka is processed and routed to downstream systems.

Technical Explanation:

  1. Consume Kafka: Use ConsumeKafka to read data from Kafka.

     <processor class="org.apache.nifi.processors.kafka.pubsub.ConsumeKafka">
         <property name="Kafka Brokers">localhost:9092</property>
         <property name="Topic Name">mytopic</property>
     </processor>
    
  2. Transform Data: Use JoltTransformJSON to modify JSON data.

     <processor class="org.apache.nifi.processors.standard.JoltTransformJSON">
         <property name="Jolt Specification">{"operation":"shift","spec":{"foo":"bar"}}</property>
     </processor>
    
  3. Route Data: Use RouteOnAttribute to route data based on content.

     <processor class="org.apache.nifi.processors.standard.RouteOnAttribute">
         <property name="Routing Strategy">Route to Property name</property>
     </processor>
    
  4. Store Data: Use PutHDFS to store data in Hadoop.

     <processor class="org.apache.nifi.processors.hadoop.PutHDFS">
         <property name="Hadoop Configuration Resources">/path/to/hadoop/conf</property>
         <property name="Directory">/data/output</property>
     </processor>
    

Example 3: Data Enrichment with REST API

Qualitative Explanation:

  1. Purpose: Enrich data by calling a REST API for additional information.

  2. Use Case: Enhancing datasets with external data sources.

  3. Outcome: Data is enriched with additional attributes from the API.

Technical Explanation:

  1. Fetch Data: Use InvokeHTTP to call a REST API.

     <processor class="org.apache.nifi.processors.standard.InvokeHTTP">
         <property name="HTTP Method">GET</property>
         <property name="Remote URL">http://api.example.com/data</property>
     </processor>
    
  2. Merge Data: Use MergeContent to combine API responses with original data.

     <processor class="org.apache.nifi.processors.standard.MergeContent">
         <property name="Merge Format">JSON</property>
     </processor>
    
  3. Transform Data: Use ReplaceText to update data fields.

     <processor class="org.apache.nifi.processors.standard.ReplaceText">
         <property name="Replacement Value">{"newField":"value"}</property>
     </processor>
    
  4. Store Data: Use PutFile to save enriched data.

     <processor class="org.apache.nifi.processors.standard.PutFile">
         <property name="Directory">/path/to/output</property>
     </processor>
    

Input and Output Examples

Example 1: Ingesting Data from a CSV File

  • Input (input.csv):

      id,name,age
      1,Alice,30
      2,Bob,25
    
  • Output (Database Table):

      id | name  | age
      1  | Alice | 30
      2  | Bob   | 25
    

Example 2: Real-Time Data Streaming with Kafka

  • Input (Kafka Topic):

      {"foo":"bar"}
    
  • Output (HDFS):

      {"bar":"foo"}
    

Example 3: Data Enrichment with REST API

  • Input (Original Data):

      {"id":1,"name":"Alice"}
    
  • Output (Enriched Data):

      {"id":1,"name":"Alice","newField":"value"}
    

0
Subscribe to my newsletter

Read articles from user1272047 directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

user1272047
user1272047