Guide written by a novice creator on how to migrate Airbyte connector to Low-code

Shivam AgarwalShivam Agarwal
15 min read

This is a guide written by a novice developer who embarked on the journey to explore the open-source community with the support of his fellow comrades.

Hola folks,

This is Shivam Agarwal, and I work as a data engineer. I'm excited to present to you a guide that also encapsulates my personal experience. This guide chronicles my journey of attempting to migrate an existing Airbyte source connector from Python CDK to Low-Code CDK, with Openweather being my chosen starting point. But before delving into the specifics of my endeavor, let's first gain a clear understanding of what Airbyte is.

What is Airbyte?

So as per the official website :

Airbyte is an open-source data integration engine that helps you consolidate your data in your data warehouses, lakes and databases.

What I believe Airbyte to be :

Airbyte is a tool that helps data engineers and data professionals to cut out the stress associated with handling data.

Now we have a gist of the tool (I hope the one-liners did the job :P). Let's delve into the focal points of this discussion. My aim is to offer a concise breakdown of the content that lies ahead. I'll be guiding you through the process of migrating a "simple" API connector to Low-code CDK.

Let's start with the basics :

Prerequisites:

  • python3 (better to have python 3.10 )

  • docker ( If you don't have it installed, use this link to set it up )

  • API key of your source

Local setup of Airbyte:

Step 1:

Fork the Airbyte repository and make a local clone of it in your system. If you dont know the commands refer to this - https://docs.github.com/en/get-started/quickstart/fork-a-repo

Clone Command -

git clone https://github.com/<your username>/airbyte.git

Step 2 :

Once you have the repository cloned on your local system, cd into the airbyte root directory

cd airbyte

and run the following command

./run-ab-platform.sh
or
sudo ./run-ab-platform.sh --if you dont have proper priviledges set for docker

This will set up the Airbyte platform on your system. To validate the setup check for the Airbyte banner. Once you are able to see it visit http://localhost:8000. By default, the username is airbyte and password password (love the creativity here :P)

Step 3 : (optional)

If you are not able to make changes to the files like saving or updating. Use this command in the airbyte's root directory to change the permission. This is for Linux based OS like Ubuntu or macOS as well

sudo chown username:username -R .

We are done with the formalities. Let the migration of connectors begin !!!!

Getting started with the templating

We need to first set up the new connector source to be migrated.

Before we start, we need to make the backup of the connector we want to migrate.

For my use case, I did it by updating the source directory name and adding a "-backup" to it (something that was suggested by @bktcodedev). Make sure you do not commit this directory. This backup will help you go through the functionalities that are supported by the connector

Now you can create the new source.

In order to do so first move into the template generator directory and execute the ./generate.sh bash file. Commands for this would be as follows :

cd airbyte-integrations/connector-templates/generator
./generate.sh

This will bring up an interactive helper application.

Use the arrow keys to pick a template from the list. Select the Configuration Based Source template and then input the name of your connector.

The application will create a new directory airbyte/airbyte-integrations/connectors/ with the name of your new connector. The generator will create a new module for your connector with the name source-<connector-name> for me it was source-openweather.

Now you have the source configured. What next? Adding the dependencies and creating a virtual environment.

Creating a Virtual Environment and installing the dependencies

Let's start by creating a python virtual environment for our source. The provided command assumes that the python reference corresponds to a python version that is 3.9.0 or newer. In certain instances, python might direct to a Python2 setup, while python3 indicates Python3. If this situation applies to your system, replace python within the command. The following python usages will then employ the virtual environment established for the connector. (Personal note: In my case, the Python version is 3.10)

cd ../../connectors/source-exchange-rates-tutorial
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

These steps create an initial Python environment and install the dependencies required to run an API Source connector.

Let's verify everything works as expected by running the Airbyte spec operation:

python main.py spec

This output is an indication that we are moving in the righteous path. Let's GOOO!!!!

{"type": "SPEC", "spec": {"documentationUrl": "https://docsurl.com", "connectionSpecification": {"$schema": "http://json-schema.org/draft-07/schema#", "title": "Python Http Tutorial Spec", "type": "object", "required": ["TODO"], "additionalProperties": false, "properties": {"TODO: This schema defines the configuration required for the source. This usually involves metadata such as database and/or authentication information.": {"type": "string", "description": "describe me"}}}}}

This is a simple sanity check to make sure everything is wired up correctly. More details on the spec operation can be found in Basic Concepts and Defining Stream Schemas.

Now the next step would be to go through the directory structure and list out the important files. Files that we would add details to or update.

Files of interest (where the doughs at )

We are prepared to commence the implementation of the connector.

Throughout this tutorial, we will be modifying several files that were produced by the code generator:

  1. source-openweather/source_openweather/manifest.yaml: This document serves as the connector's manifest. It outlines how data should be extracted from the API source and specifies the configurable inputs for the connector.

  2. source-openweather/integration_tests/configured_catalog.json: This file represents the connector's catalog. It elucidates the available data within a source.

In addition, we will be generating the following files:

  1. source-openweather/secrets/config.json: This configuration file will be employed for testing the connector. Its structure must adhere to the specifications outlined in the spec file.

  2. source-openweather/integration_tests/invalid_config.json: An invalid configuration file, which follows the schema defined in the spec file, will also be created for testing purposes.

  3. source_openweather/schemas/onecall.json: This file contains the schema definition for the stream we are about to implement. The JSON schema of the API response received.

Starting with the big guns:

manifest.yaml :

This is the file that controls it all. The command center for the connector. It has generally 2 parts:

  • definitions: This hosts all the details for different components like record selectors, retrievers, and requesters.

  • spec: This section hosts the details configuration required for the integration.

definitions :

There are a few components that are present in the definitions :

  1. record selector: This component allows the connector to define how to retrieve the data. The selector type RecordSelector allows us to extract the data from the API response. The definition for the record selector can be found here. Point to note here is that the selector is used to define the path from the data. Say you want a specific part within your API response to be returned instead of all the jargon that is attached to it. We do not want our responses to be too confusing. To make sure we are extracting only the crème de la crème we use a record selector

  2. requesters : This component is used to prepare HTTP requests to send to the source API. Presently, there exists a sole implementation known as HttpRequester, which is characterized by

    1. url_base: The root of the API source

    2. http_method: the HTTP method to use (GET or POST)

    3. A request options provider: Defines the request parameters (query parameters), headers, and request body to set on outgoing HTTP requests

      1. For Openweather we need to send longitude, latitude, and appid as request parameters. For that we used request_parameters. This option allows the definitions of request parameters that are required.

        Fun fact: In order to implement conditional selections, one can use Jinja's conditional statements. For Eg: I used the below condition to make sure that the latitude values are always within the range of -90 to 90

          lat: "{% if -90.00 <= config['lat']|float <= 90.00 %}{{ config['lat'] }}{% else %} WRONG LATITUDE{% endif %}"
        
    4. authenticator: Defines how to authenticate to the source

    5. error_handler: Defines how to handle errors.

  3. retriever: The data retriever defines how to read the data for a Stream and acts as an orchestrator for the data retrieval flow

  4. stream_definitions: This section is used to define the streams that you want to include within your connector. For Openweather, there is only one stream i.e. onecall . In order to define the stream you need to have a base_stream which is defined using DeclarativeStream and can be referenced by multiple streams. For my onecall_stream , we have to define the configuration related to the stream here like path(The specific endpoint to fetch data from for a resource) and name .

There are other sections namely pagination and incremental sync which are not implemented in this connector since this connector does not require them. ( *sigh*)

The definitions section that I used for openweather source:

definitions:
  selector:
    type: RecordSelector
    extractor:
      type: DpathExtractor
      field_path: []


  requester:
    type: HttpRequester
    url_base: "https://api.openweathermap.org/data/3.0/"
    http_method: "GET"
    request_parameters:
      lat: "{% if -90.00 <= config['lat']|float <= 90.00 %}{{ config['lat'] }}{% else %} WRONG LATITUDE{% endif %}"
      lon: "{% if -180.00 <= config['lon']|float <= 180.00 %}{{ config['lon'] }}{% else %}WRONG LONGITUDE{% endif %}"
      appid: "{{ config['appid'] }}"
      lang: "{{ config.get('lang')}}"
      units: "{{ config.get('units')}}"
    error_handler:
      response_filters:
          - http_codes: [ 500 ]
            action: RETRY
  retriever:
    type: SimpleRetriever
    record_selector:  
      $ref: "#/definitions/selector"
    paginator:
      type: NoPagination
    requester:
      $ref: "#/definitions/requester"
  base_stream:
    type: DeclarativeStream
    retriever:
      $ref: "#/definitions/retriever"

  onecall_stream:
    $ref: "#/definitions/base_stream"
    name: "onecall"
    retriever:
      $ref: "#/definitions/retriever"
    $parameters:
      path: "onecall"

spec :

This section is used to define source specification made up of connector metadata and how it can be configured. It hosts details like connection_specification, properties etc.

There are a few components that are present in the spec :

  1. documentation_url : The link to the documentation for the configured source

  2. connection_specifications:

    1. required : This will define all the properties that are required for integration. All the properties mentioned here can't be skipped and one must define those.

    2. properties: This is a list of all the properties. It can host required as well as optional properties.

    3. additionalProperties: This is a boolean value that specifies that there might be additional properties apart from what has already been mentioned

This section can be directly copied from the existing spec.json file in the original source connector. Using any available online converter for JSON to YAML, we can get this converted to YAML and directly use that.

The spec section that I used for openweather source:


spec:
  type: Spec
  documentation_url: https://docs.airbyte.com/integrations/sources/openweather
  connection_specification:
    title: Openweather Spec
    type: object
    required:
      - lat
      - lon
      - appid
    additionalProperties: true
    properties:
      lat:
        type: string
        pattern: "^[-]?\\d{1,2}(\\.\\d+)?$"
        description: "Latitude, decimal (-90; 90). If you need the geocoder to automatic convert city names and zip-codes to geo coordinates and the other way around, please use our Geocoding API"
        examples:
          - "45.7603"
          - "-21.249107858038816"
      lon:
        type: string
        pattern: "^[-]?\\d{1,2}(\\.\\d+)?$"
        description: "Longitude, decimal (-180; 180). If you need the geocoder to automatic convert city names and zip-codes to geo coordinates and the other way around, please use our Geocoding API"
        examples:
          - "4.835659"
          - "-70.39482074115321"
      appid:
        type: string
        description: "API KEY"
        airbyte_secret: true
      units:
        type: string
        description: "Units of measurement. standard, metric and imperial units are available. If you do not use the units parameter, standard units will be applied by default."
        enum : 
          - standard
          - metric
          - imperial
        examples:
          - "standard"
          - "metric"
          - "imperial"
      # dt:
      #   type: string
      #   description : "Date in UNIX format"
      #   example:
      #     - '1693023447'
      only_current:
        type: boolean
        description : "True for particular day"
        example:
          - 'true'
      lang:
        type: string
        description: You can use lang parameter to get the output in your language.
          The contents of the description field will be translated. See <a href="https://openweathermap.org/api/one-call-api#multi">here</a>
          for the list of supported languages.
        enum:
          - af
          - al
          - ar
          - az
          - bg
          - ca
          - cz
          - da
          - de
          - el
          - en
          - eu
          - fa
          - fi
          - fr
          - gl
          - he
          - hi
          - hr
          - hu
          - id
          - it
          - ja
          - kr
          - la
          - lt
          - mk
          - 'no'
          - nl
          - pl
          - pt
          - pt_br
          - ro
          - ru
          - sv
          - se
          - sk
          - sl
          - sp
          - es
          - sr
          - th
          - tr
          - ua
          - uk
          - vi
          - zh_cn
          - zh_tw
          - zu
        examples:
          - en
          - fr
          - pt_br
          - uk
          - zh_cn
          - zh_tw

Coming to the essentials

configured_catalog.json

The catalog is used to test the connection as well as read data from the stream.The catalog is structured as a list of AirbyteStream. In the case of a database a "stream" is analogous to a table. This configuration is used to define the properties that aid to the reading of data.

Let's walk through what each field in a stream means.

  • name - The name of the stream.

  • supported_sync_modes - This field lists the type of data replication that this source supports. The possible values in this array include FULL_REFRESH (docs) and INCREMENTAL (docs).

  • json_schema - This field is a JsonSchema object that describes the structure of the data. Notice that each key in the properties object corresponds to a column name in our database table.

configured_catalog for Openweather :

{
  "streams": [
    {
      "stream": {
        "name": "onecall",
        "json_schema": {},
        "supported_sync_modes": ["full_refresh"]
      },
      "sync_mode": "full_refresh",
      "destination_sync_mode": "overwrite"
    }
  ]
}

Once the entries are populated. We need to check if the connection is getting established with the source properly. For that, we would need to run the following command

python main.py check --config secrets/config.json

A successful run would return the below output :

schema.json :

Think of schema files like those trusty old sneakers you've got tucked away in the closet. You only really dig them out when your funky dance moves suddenly become cool again. Since the API we want to use has not changed the API response format. We will use the existing schema file as it is with a minor addition - "additionalProperties": true. This property would allow the addition of extra properties without specific definitions. In case you do not have the schema handy, use ChatGPT to get the API response created in the following format. (This AI is making me lazy LOL! )

Schema I used :

source_openweather/schemas/onecall.json

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "type": "object",
  "additionalProperties": true,
  "properties": {
    "lat": {
      "type": "number"
    },
    "lon": {
      "type": "number"
    },
    "timezone": {
      "type": "string"
    },
    "timezone_offset": {
      "type": "number"
    },
    "current": {
      "type": "object",
      "additionalProperties": true,
      "properties": {
        "dt": {
          "type": "number"
        },
        "sunrise": {
          "type": "number"
        },
        "sunset": {
          "type": "number"
        },
        "temp": {
          "type": "number"
        },
        "feels_like": {
          "type": "number"
        },
        "pressure": {
          "type": "number"
        },
        "humidity": {
          "type": "number"
        },
        "dew_point": {
          "type": "number"
        },
        "uvi": {
          "type": "number"
        },
        "clouds": {
          "type": "number"
        },
        "visibility": {
          "type": "number"
        },
        "wind_speed": {
          "type": "number"
        },
        "wind_deg": {
          "type": "number"
        },
        "weather": {
          "type": "array"
        },
        "rain": {
          "type": "object",
          "additionalProperties": true,
          "properties": {
            "1h": {
              "type": "number"
            }
          }
        }
      }
    },
    "minutely": {
      "type": "array"
    },
    "hourly": {
      "type": "array"
    },
    "daily": {
      "type": "array"
    },
    "alerts": {
      "type": "array"
    }
  }
}

secrets/config.json:

This file contains the access keys and the required properties that are needed for the connector to function properly. Any values defined here won't be committed to the git as this is a git-ignored file.

config.json for Openweather :

{
  "lat": "33.44",
  "lon": "-94.04",
  "appid": "api-key",
  "lang": "fr",
  "units": "standard"
}

integration_tests/invalid_config.json:

This file contains invalid properties that are needed for the integration tests.

invalid_config.json for Openweather :

{
  "lat": "12.1",
  "lon": "-43.1",
  "appid": "wrongkey"
}

Now we are done with the basic configurations. It is time to harvest what we have sown. Get ready for the big battle.

Does the connector return data?

To test if we can read the data from the source, we need to define our configured_catalog.json , schema.json and the manifest.yaml. For our purposes, we have already configured them above.

Now to validate the run we need to execute the following command

python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json

Here the read operation reads the data from the source and it looks something like this -

"Houston, We have secured connection! I repeat we have a connection! "

Based on the above screenshot we can confirm that the data is being read successfully. If you are facing any issues try to debug or use Kapa - the saviour on Slack.

What's left ???????

Testing the connector

So testing would make you believe in God again, you might find yourself wishing for his return to Earth, hoping that he would assist you in successfully passing the acceptance tests. But then again if everything is in order it would be a smooth ride. To configure tests, you need to update the acceptance-test-config.yml file. This will host the tests you want your connector to pass. Some types of tests are

  • Test Connection: Verify that a check operation issued to the connector with the input config file returns a successful response

  • Test Discovery: Verifies when a discover operation is run on the connector using the given config file, a valid catalog is produced by the connector

  • Test Basic Read: Configuring all streams in the input catalog to full refresh mode verifies that a read operation produces some RECORD messages

You could reuse the original acceptance-test-config.yml from the back. Just make sure the formatting is proper and the tests are valid.

The acceptance-test-config.yml for Openweather:

connector_image: airbyte/source-openweather:dev
acceptance_tests:
  connection:
    tests:
      - config_path: "secrets/config.json"
        status: "succeed"
      - config_path: "integration_tests/invalid_config.json"
        status: "failed"
  discovery:
    tests:
      - config_path: "secrets/config.json"
        backward_compatibility_tests_config:
            disable_for_version: "0.1.6"
  basic_read:
    tests:
      - config_path: "secrets/config.json"
        configured_catalog_path: "integration_tests/configured_catalog.json"
  incremental: 
    bypass_reason: "This connector does not implement incremental sync"

Once you have this configured start with the prayers, I mean with the next steps. Starting with the creation of a docker image for the source. Run the following command in the source root directory

docker build . -t airbyte/source-openweather:dev

Make sure to name your docker image properly. Once this is built successfully, we need to run the test script. There are two ways to run it, either by running it through the docker or through the terminal. To run it through the terminal execute the following command

 python -m pytest integration_tests -p integration_tests.acceptance

It should return something of this sorts

Or you could use the docker way. For that, you need to run the following command from the connector root

./acceptance-test-docker.sh

Once you run the code and the tests pass. You could thank the heavens. You have successfully migrated a connector.

Conclusion

With minimal coding experience and a knack for finding ways to cut corners, you have successfully migrated the connector to Low-code CDK. Following this step-by-step guide/tutorial/personal experience, you have learned how to migrate an existing source connector to Low-code. You have learned how to create and configure the manifest.yml file, reuse the existing schema.json file with minor changes and defining the acceptance-test-config.yml .

Places to look for references

Special mentions :

Thank you for your help :)

  • Avinash Gour

  • bktcodedev

  • Mikhail Masyagin

    Disclaimer: This guide is written by a developer who has tried his hands on the migration for the first time. Please do reach out for suggestions and advice

24
Subscribe to my newsletter

Read articles from Shivam Agarwal directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Shivam Agarwal
Shivam Agarwal