Automate Google Analytics (GA4) with MindsDB: Building a New Integration

My Journey with Open Source and MindsDB

In early 2023, I began my journey into open source contributions. I started with simple python code and similar tasks. As I gained more experience, I became eager to find a project where I could contribute with passion and enhance my python skills. That's when I stumbled upon MindsDB , a fantastic repository on GitHub with a vibrant community of contributors and maintainers.

I began my journey into MindsDB, discovering how it enables the automation of machine learning tasks through straightforward SQL commands. It's amazing how you can apply this automation to your own data, such as emails, calendars, and much more. Intrigued, I decided to clone the mindsdb repository and began my quest to find an issue to contribute to.

Fortunately, I came across a problem with Google Analytics and asked to be assigned to it. As soon as I got the task, I enthusiastically delved into working on this integration, and the hashnode hackathon motivated me to put in a substantial amount of effort to make it happen.


Learning Google Analytics API

If you search for "google analytics api" on Google, you'll see that the Core Reporting API appears as the top search result. However, upon further exploration of the Google Analytics API, I discovered that there are actually two different types of APIs:

Google Analytics 4 API

GA4, also known as Google Analytics 4, represents the future of Analytics as it gathers event-based information from websites and apps alike. This advanced version offers three distinct types of APIs, namely the admin API, data API, and user deletion API

Universal Analytics API

Google Analytics 4 is the latest version, replacing the older Universal Analytics. As a result, Universal Analytics is now deprecated. If you're exploring Google Analytics APIs, you'll come across numerous articles guiding you on migrating from Universal Analytics to GA4.

Among the various APIs available, the core reporting API is particularly prominent and can be easily found in the top search results.

In the end, I made the decision to utilize the latest version of Google Analytics, known as Google Analytics 4. Out of the three available APIs, I opted for the admin API. This particular API enables programmatic access to the configuration data of Google Analytics 4 (GA4) and is exclusively compatible with GA4 properties.


Designing the API

One of the most effective approaches to develop a new integration is by designing the API for it. To ensure its functionality, I began testing the API by making various requests and evaluating the CRUD operations.

However, I encountered a new challenge along the way: determining the specific data I wanted to observe. After careful consideration, I opted to proceed with Conversion Events, click here form more information.

I started running numerous requests to test the API before actually implementing it. During this process, I discovered the correct methods to retrieve the data. After putting in a significant amount of effort, I began building the integration.

Initialize Google Analytics Handler

The issue description recommends checking the twitter handler as a starting point for creating the new handler. However, I came across the gmail and google calendar handlers while looking for references for the new integration. These handlers have similar methods, except they use different routes.

class GoogleAnalyticsHandler(APIHandler):
    """A class for handling connections and interactions with the Google Analytics Admin API.

    Attributes:
        credentials_file (str): The path to the Google Auth Credentials file for authentication
        and interacting with the Google Analytics API on behalf of the user.

        Scopes (List[str], Optional): The scopes to use when authenticating with the Google Analytics API.
    """

    name = 'google_analytics'

    def __init__(self, name: str, **kwargs):
        super().__init__(name)
        self.page_size = 500
        self.connection_args = kwargs.get('connection_data', {})

        self.credentials_file = self.connection_args['credentials_file']
        self.property_id = self.connection_args['property_id']
        if self.connection_args.get('credentials'):
            self.credentials_file = self.connection_args.pop('credentials')

        self.scopes = self.connection_args.get('scopes', DEFAULT_SCOPES)
        self.service = None
        self.is_connected = False
        conversion_events = ConversionEventsTable(self)
        self.conversion_events = conversion_events
        self._register_table('conversion_events', conversion_events)

The connection with Google Analytics Admin API is managed by the GoogleAnalyticsHandler class, which utilizes two primary parameters: self.credentials_file and self.property_id.

self.credentials_file: The JSON file that is obtained when you enable the Google Analytics Admin API in a GCP project, please click here for further details.

self.property_id: The property id of your Google analytics website, here is an article on how to get this property id

Create Google Analytics Connection

Take a look at the following lines of code which showcase the methods utilized for connecting to the Google Analytics Admin API.

    def create_connection(self):
        creds = None

        if os.path.isfile(self.credentials_file):
            creds = service_account.Credentials.from_service_account_file(self.credentials_file, scopes=self.scopes)

        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            elif not os.path.isfile(self.credentials_file):
                raise Exception('Credentials must be a file path')

        return AnalyticsAdminServiceClient(credentials=creds)

    def connect(self):
        """Authenticate with the Google Analytics Admin API using the credential file.

        Returns
        -------
        service: object
            The authenticated Google Analytics Admin API service object.
        """
        if self.is_connected is True:
            return self.service

        self.service = self.create_connection()
        self.is_connected = True

        return self.service

    def check_connection(self) -> StatusResponse:
        """
        Check connection to the handler.

        Returns
        -------
        response
            Status confirmation
        """
        response = StatusResponse(False)

        try:
            # Call the Google Analytics API
            service = self.connect()
            result = service.list_conversion_events(parent=f'properties/{self.property_id}')

            if result is not None:
                response.success = True
        except HttpError as error:
            response.error_message = f'Error connecting to Google Analytics api: {error}.'
            log.logger.error(response.error_message)

        if response.success is False and self.is_connected is True:
            self.is_connected = False

        return response

To ensure a smooth connection, we run a request that displays a comprehensive list of all conversion events in your GA4 property.

    def native_query(self, query_string: str = None) -> Response:
        ast = parse_sql(query_string, dialect="mindsdb")

        return self.query(ast)

    def get_api_url(self, endpoint):
        return f'{endpoint}/{self.property_id}'

The native_query method allows us to run tests on the methods without relying on the mindsdb editor. On the other hand, the get_api_url method saves us from typing the API endpoint every time we need it.

You can now easily link up with Google Analytics through the MindsDB editor.


Google Analytics Tables

I have created a new python file named google_analytics_tables.py. This file will serve as the foundation for all the logic. Additionally, I have initialized a ConversionEventsTable class within it. This class will handle all the CRUD operations, such as select, insert, update, and delete, for SQL commands.

class ConversionEventsTable(APITable):

    def select(self, query: ast.Select) -> pd.DataFrame:
        """
        Gets all conversion events from google analytics property.

        Args:
            query (ast.Select): SQL query to parse.

        Returns:
            Response: Response object containing the results.
        """
        # Parse the query to get the conditions.
        conditions = extract_comparison_conditions(query.where)
        # Get the page size from the conditions.
        params = {}
        for op, arg1, arg2 in conditions:
            if arg1 == 'page_size':
                params[arg1] = arg2
            else:
                raise NotImplementedError

        # Get the order by from the query.
        if query.order_by is not None:
            raise NotImplementedError

        if query.limit is not None:
            raise NotImplementedError

        # Get the conversion events from the Google Analytics Admin API.
        conversion_events = pd.DataFrame(columns=self.get_columns())
        result = self.get_conversion_events(params=params)
        conversion_events_data = self.extract_conversion_events_data(result.conversion_events)
        events = self.concat_dataframes(conversion_events, conversion_events_data)

        selected_columns = []
        for target in query.targets:
            if isinstance(target, ast.Star):
                selected_columns = self.get_columns()
                break
            elif isinstance(target, ast.Identifier):
                selected_columns.append(target.parts[-1])
            else:
                raise ValueError(f"Unknown query target {type(target)}")

        if len(events) == 0:
            events = pd.DataFrame([], columns=selected_columns)
        else:
            events.columns = self.get_columns()
            for col in set(events.columns).difference(set(selected_columns)):
                events = events.drop(col, axis=1)
        return events
    def get_conversion_events(self, params: dict = None):
        """
        List all conversion events in your GA4 property
        Args:
            params (dict): query parameters
        Returns:
            ConversionEvent objects
        """
        service = self.handler.connect()
        page_token = None
        url = self.handler.get_api_url('properties')

        while True:
            request = ListConversionEventsRequest(parent=url,
                                                  page_token=page_token, **params)
            result = service.list_conversion_events(request)

            page_token = result.next_page_token
            if not page_token:
                break
        return result

In the provided code, you will notice a connection between the select and get_conversion_events methods. We pass the query to the select method and then proceed to the get_conversion_events method. This is where the actual API logic is implemented, retrieving all conversion events from Google Analytics.

I performed the identical task for insert, update, and delete operations, and their corresponding methods are create_conversion_event, update_conversion_event, and delete_conversion_event.

I have implemented two methods within this class to manage the data and retrieve it as a dataframe. You can locate these methods in the code lines below.

    @staticmethod
    def extract_conversion_events_data(conversion_events):
        """
        Extract conversion events data and return a list of lists.
        Args:
            conversion_events: List of ConversionEvent objects
        Returns:
            List of lists containing conversion event data
        """
        conversion_events_data = []
        for conversion_event in conversion_events:
            data_row = [
                conversion_event.name,
                conversion_event.event_name,
                conversion_event.create_time,
                conversion_event.deletable,
                conversion_event.custom,
                conversion_event.ConversionCountingMethod(conversion_event.counting_method).name,
            ]
            conversion_events_data.append(data_row)
        return conversion_events_data

    def concat_dataframes(self, existing_df, data):
        """
        Concatenate existing DataFrame with new data.
        Args:
            existing_df: Existing DataFrame
            data: New data to be added to the DataFrame
        Returns:
            Concatenated DataFrame
        """
        return pd.concat(
            [existing_df, pd.DataFrame(data, columns=self.get_columns())],
            ignore_index=True
        )

Testing Google Analytics Handler

Let's put coding aside for now and dive into testing the handler to see if there are any issues.

Unit Testing

I made a python file named test_google_analytics_handler.py inside the tests folder. In this file, I perform unit testing for connection, tables, and SQL queries. These tests are extremely helpful in debugging and troubleshooting the problems you may encounter.

from mindsdb.integrations.handlers.google_analytics_handler.google_analytics_handler import GoogleAnalyticsHandler
from mindsdb.api.mysql.mysql_proxy.libs.constants.response_type import RESPONSE_TYPE
import unittest


class GoogleAnalyticsTest(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.kwargs = {
            "connection_data": {
                "credentials_file": "/home/talaat/Downloads/credentials.json",
                "property_id": '<YOUR_PROPERTY_ID>'
            }
        }

        cls.handler = GoogleAnalyticsHandler('test_google_analytics_handler', **cls.kwargs)

    def test_0_check_connection(self):
        assert self.handler.check_connection()

    def test_1_get_tables(self):
        tables = self.handler.get_tables()
        print(tables.type)
        print(RESPONSE_TYPE.TABLE)
        assert tables.type is RESPONSE_TYPE.TABLE

    def test_2_native_query_select(self):
        query = 'SELECT * FROM conversion_events'
        result = self.handler.native_query(query)
        assert result.type is RESPONSE_TYPE.TABLE

    def test_3_native_query_update(self):
        query = 'UPDATE conversion_events SET countingMethod = 1 WHERE name = "properties/371809744/conversionEvents/6637248600"'
        result = self.handler.native_query(query)
        assert result.type is RESPONSE_TYPE.OK

    def test_4_native_query_delete(self):
        query = 'DELETE FROM conversion_events WHERE name = "properties/371809744/conversionEvents/6622916665"'
        result = self.handler.native_query(query)
        assert result.type is RESPONSE_TYPE.OK

    def test_5_native_query_insert(self):
        query = "INSERT INTO conversion_events (event_name, countingMethod) VALUES ('event_4', 2)"
        result = self.handler.native_query(query)
        assert result.type is RESPONSE_TYPE.OK


if __name__ == '__main__':
    unittest.main()

MindsDB Editor

Now that we have completed our tests, we are fully prepared to test the handler using the MindsDB editor. Below are the tests I have conducted locally, as the handler has not been merged yet.

After completing the changes, I proceed to commit them and create a pull request to add the new handler to the staging branch. With the assistance of mindsdb maintainers such as Minura Punchihewa and Tarun Chawla , I am now ready to merge this handler. I would like to express my gratitude to them for their help. If you are curious about the pull request, check out the link below.


Google Analytics Docs

I didn't just stop there. I took the initiative to document my work and create a comprehensive guide to assist people in using this integration correctly. The following is my documentation by running mindsdb documentation locally. And yes, you guessed it right, I'm eagerly waiting for this to be merged as well.


How can this integration be beneficial?

There are numerous ways to utilize this data. You can construct your own recommendation system using the mindsdb editor, which suggests new conversion events that you can create and keep track of.

Additionally, you have the option to construct a predictive model that forecasts conversion events using the existing conversion data. This will greatly assist you in accurately tallying the events and uncovering valuable insights about your users.

You can also utilize mindsdb to build agents. For detailed instructions on creating fully functional agents, refer to the mindsdb documentation. Additionally, you can leverage openai to develop models based on gpt3.5 for querying your data or any other inquiries.


Will you be continuing to contribute to MindsDB?

Yes, indeed! I've discovered that MindsDB is one of those amazing companies that I absolutely adore working with. Their team is exceptional, and they're putting in a tremendous effort to make this product outstanding. As for me, I'll be persistently working on enhancing this integration by introducing additional features and working with multiple tables. Not only that, but I'll also be actively involved in developing new integrations.


Code

LinkedIn

GitHub

12
Subscribe to my newsletter

Read articles from Talaat Mohamed Hasanin directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Talaat Mohamed Hasanin
Talaat Mohamed Hasanin