Dataform: schedule daily updates using Cloud Functions

Dataform schedule daily updates using Cloud Functions

This post is part of a series of Dataform posts. I assume you already read the previous posts and are ready to trigger your GA4 data pipelines.

The Goal

In this post I’ll show how to run Dataform for these two cases:

  • As soon as GA4 tables are ready
  • Hourly (based on Google Cloud Scheduler)

To make things a bit more interesting I’ll add a few additional requirements for GA4 daily tables trigger:

  • Get a table name and process only models related to the updated table
  • If we have more than one GA4 dataset, execute only models from the same dataset as the updated table.
  • Don’t use Airflow or Google Workflow. They’re great tools, but we don’t need any additional orchestration tool for this simple case, as Dataform manages dependencies (thanks to ref). So we need only to run the flow at the right time.

Note. Unfortunately Google doesn’t update GA4 test dataset. But we will prepare everything and be ready to process new Google Store sales, not a single Google t-shirt will slip past us. But of course, we can do a few manual tests while waiting..

The plan

We will first set dataset_id as tags for each model. Then on GA4 table update, we will generate a pubsub event with dataset_id and table_name. And based on this event, start Dataform execution with tags value equal to dataset_id and pass table_name as a configuration variable.

For example, imagine we have GA4 property with an ID equal to 123456, the data from this property is exported to the analytics_123456 dataset. Our plan is:

  1. For all models like sessions.sqlx, page_view.sqlx etc.. we set tags value as analytics_123456;
  2. For daily export Google creates a new table events_22220101;
  3. We subscribe to this event, execute Dataform actions with tags equal to analytics_123456, and pass table name events_22220101 as a variable.

Thus when Google exports GA4 data for one property we will run only actions for this specific property.

Preparatory stage

In this post, I will use code from the previous post and a separate repository with cloud functions.

But before we can start playing with Cloud Functions we need slightly change Dataform actions – add tags and global variables.

Rename models

First I usually rename all files and models to something dataset-specific. For files, I add GA4 property ID at the end. For example, I rename sessions.sqlx to sessions_123456.sqlx where 123456 - GA4 property ID. But for this tutorial, we use only one test dataset so I keep the file names as it is.

The next step is to rename all the models. In the definitions/staging/events/declarations.js add a postfix for all constants with table names:

const eventName = eventConfig.eventName_sample_ecommerce;
const eventTbl = eventConfig.eventName_sample_ecommerce;
const eventTempTbl = `${eventConfig.eventName}_tmp_sample_ecommerce`;
const eventErrorsTbl = `${eventConfig.eventName}_errors_sample_ecommerce`;
const eventIntradayTbl = `${eventConfig.eventName}_intraday_sample_ecommerce`;
const eventView = `${eventConfig.eventName}_view_sample_ecommerce`;

So for each GA4 property I create separate .sqlx or .js files, and in all files add GA4 property ID at the end of all models. And after all models execution, in the dataform_staging dataset, we will have tables like sessions_<property_id>, page_view_<property_id>, etc. for each GA4 property.

Add tags

In Dataform we can start execution with a tag, in this case, we will run only actions having this specific tag, and can control which models to execute.

So in definitions/staging/events/declarations.js we need to find all config blocks and add tags, something like this:

.config({type: "table",schema: "dataform_staging",tags:["ga4_obfuscated_sample_ecommerce"]})

We also add ga4_hourly tags to intraday tables and views.

.config({type: "table",schema: "dataform_staging",tags:["ga4_hourly", "ga4_obfuscated_sample_ecommerce"]})

To check you did everything correctly you could rerun all actions by tag.

Click on Start Execution button, hover over Tags menu, and click on ga4_obfuscated_sample_ecommerce.

Dataform Start Execution

You should see the right panel with all our models, and of course, click to Start Execution to check everything ready for automation.

Move the table name to the global config

And the last preparation step – is moving constant GA4_TABLE with a last-day table name from includes/constants.js to global config dataform.json.

When we run Dataform by API, we can set a new value for the configuration variable. And we will change GA4_TABLE to the name of the updated table.

But for now, delete GA4_TABLE from includes/constants.js.

const GA4_DATABASE = 'bigquery-public-data';
module.exports = {GA4_DATABASE}

And add it to dataform.json

{
    "defaultSchema": "dataform",
    "assertionSchema": "dataform_assertions",
    "warehouse": "bigquery",
    "defaultDatabase": "cafe-assistant-295900",
    "defaultLocation": "US",
    "vars": {
        "GA4_TABLE": "events_20210131",
    }
}

So we add vars key and define GA4_TABLE value.

And one more time, thanks to Dataform! In the interface in the files tree you will see compilation errors:

Dataform Compilation Errors

It shows which files use the deleted variable, so we can quickly fix it. In definitions/staging/events/declarations.js change constants.GA4_TABLE to dataform.projectConfig.vars.GA4_TABLE.

In our tutorial, we have only one dataset, but if you have more than one dataset you need also to specify the dataset name in all your refs functions. Because overwise you will have a name conflict – more than one model with the same name.

So change: ${ctx.ref('events_*')} To ${ctx.ref(constants.GA4_DATABASE, dataform.projectConfig.vars.GA4_DATASET, "events_*")}

In our case, we also set database values (constants.GA4_DATABASE), but usually, we query data from the same project so you don’t need to specify a database.

Ok, and we need to make the same changes in definitions/sources/ga4/declarations.js.

Good news everyone, we are ready to trigger our project.

You can see the final state here: Dataform models with tags for daily updates

Trigger Dataform on GA4 table update

Implementation steps

The process will take a few steps:

  1. Add logging sink on GA4 tables update, the sink will generate Pub/Sub event on table update;
  2. Create a cloud function that will subscribe to the event from the previous step, extract the dataset and table name from the event, and generate a new Pub/Sub event with parameters needed to run Dataform execution;
  3. Create cloud function to trigger Dataform based on previous Pub/Sub event;

Yes, I added one more step. We could easily combine the last two into one. But later we will need to execute Dataform based on Google Scheduler, and the format of incoming events will differ. That’s why I added the second step to extract the table, and dataset names and generate the new pubsub events with data in “prepared format” to run Dataform.

As another benefit, you can add more than one subscriber to the GA4 table update event. For example, notify an external system, send slack messages, or create a corporative gambling service and take bets on Google updates data before 12 or later.

Init gcloud configuration

For all these steps I will use gcloud CLI. The most important reason - everybody in the marketing department will know you are a developer because “he uses these black windows”, and second we will reuse the same commands later for the automation.

I’m pretty sure that you have already installed the gcloud. But let me mention these handy little commands to switch between projects. If you already use gcloud configurations please skip this part.

To create a new configuration with the name dataform run this command:

gcloud config configurations create dataform

The new configuration will be created and activated, now we should init it:

gcloud init

Then it asks you to pick the next step, select 1:

[1] Re-initialize this configuration [dataform] with new settings.

Then it asks you to choose or log in to an account, if it’s your first configuration you have to log in.

Then you have to choose or create a project and select an existing one with a Dataform code.

Finally, it will ask whether you want to set a default zone for your project.

Congratulations, your first configuration is created. Now you can easily switch between configurations, like this:

gcloud config configurations activate [configuration name]

Or lists all existing configurations:

gcloud config configurations list

So you don’t need to specify a project in your gcloud commands and could easily get the project settings. For example Project ID

gcloud info --format='value(config.project)'

We will use Environment Variables in our gcloud commands. With them, it will be easier to reuse these commands between projects.

Let’s define a few variables:

export project_id=$(gcloud info --format='value(config.project)')
export repository_id=dataform-ga4events
export region=us-central1
Dataform Repository Name
  • repository_id is the name of your Dataform repository (not your github repository) you can get it from the page on the screen above.
  • region is the default Dataform region, from the screenshot above you could also get needed location. We will use this location for all Google Cloud services we enable later.

To check you did everything right, you could run echo command:

echo ${project_id}

Create Pub/Sub topics

According to our plan we will need two Pub/Sub topics, let’s create them. First, define variables with their names:

export ga4_table_updated_topic=ga4-table-updated-topic
export dataform_run_topic=dataform-run-topic

And run these gcloud commands:

gcloud pubsub topics create ${ga4_table_updated_topic}
gcloud pubsub topics create ${dataform_run_topic}

Open Pub/Sub page of your project and check the list of topics, you should see something like this:

PubSub Topics

Later we will return to this page to check that we grant all required permissions.

Create logging sink

This is the sad part of this tutorial, the hopeless one. As Google doesn’t update its test dataset. So if you want to test this part you have to change Google’s dataset to your real GA4 dataset.

Before creating a sink, let’s find logs record about GA4 daily table creation. Open the Logs Explorer and try to use this query:

protoPayload.methodName="jobservice.jobcompleted"
protoPayload.authenticationInfo.principalEmail="firebase-measurement@system.gserviceaccount.com"
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.datasetId="DATASET"
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.tableId=~"^events\_\d+"

As you see here we filter by principalEmail. BigQuery Export should always use this account firebase-measurement@system.gserviceaccount.com

Please also change DATASET to your GA4 dataset name, something like analytics_311287033. And because all daily tables have names like this events_20230415 - we can use this regexp ^events_d+ for the table_id.

So copy-paste the query to the Logs explorer change the date range from Last 1 hour to Last 2 days, and click Run query:

Log Explorer

You should see a few records. Select any row, click on Collapse nested fields button, and review the log data.

Log Explorer Collapse nested fields

And also you can notice that sometimes we have more than one event per day, but we were prepared and in the previous post defined increments Dataform models with the unique keys.

One small note, In this query we can trust, as it was suggested by Simo in his article How Do I Trigger A Scheduled Query.

And now we are ready to create a logging sink. To create a sink we should set the:

  • Sink name
  • Sink destination
  • Log filter

For pub/sup destination we will define this string:

pubsub.googleapis.com/projects/${project_id}/topics/${ga4_table_updated_topic}

And for the filter we could use the query from above. And define it in a variable:

export sync_filter="protoPayload.methodName=\"jobservice.jobcompleted\" protoPayload.authenticationInfo.principalEmail=\"firebase-measurement@system.gserviceaccount.com\"
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.datasetId=\"${dataset}\"
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.tableId=~\"^events*\d+\""

Just notice we use a dataset variable in the filter’s variable definition and also escape all quotes by slashes.

Ok, go ahead and create a sink:

gcloud logging sinks create ga4-table-updated-sink \
pubsub.googleapis.com/projects/${project_id}/topics/${ga4_table_updated_topic} \
--log-filter="${sync_filter}"

You will get this warning:

Please remember to grant serviceAccount:p855840598401-539779@gcp-sa-logging.iam.gserviceaccount.com the Pub/Sub Publisher role on the topic.

We are not going to disappoint Google and will grant permission in two steps. First, get the writerIdentity account:

gcloud logging sinks describe --format='value(writerIdentity)' ga4-table-updated-sink

Here we use describe commands together with format. It’s a handy trick we will use often not only for sinks. Remember ga4-table-updated-sink - the name of the sink from the last step.

And we can set this value to a variable like this:

export sink_writer_identity=$(gcloud logging sinks describe --format='value(writerIdentity)' \
ga4-table-updated-sink)

The second step is adding the permission:

gcloud pubsub topics add-iam-policy-binding ${ga4_table_updated_topic} \
--member="${sink_writer_identity}" --role="roles/pubsub.publisher"

As usual, let’s check the result. On the Log Router you should get something like this:

Log Router

Here are a few default routes, plus our sink. You could click on three dots for this route and click View sink details to check the destination and the logging filter. Or you can click Troubleshoot sink to test the filter in the Log Explorer.

Ok, next go to the Pub/Sub Topics page, click on ga4-table-updated-topic, and in the Permissions tab, open Pub/Sub Publisher Role and check the accounts list:

GCP Pubsub Permissions

If you don’t see this panel click on Show info panel button in the top right corner:

Pubsub Show Info Panel

Cloud function for sink events

The next step is to create the function to extract all needed information from the log and generate a new pubsub event.

Clone repo with the functions:

git clone https://github.com/ArtemKorneevGA/dataform-cloud-functions.git

Go to the function folder

cd ./ga4-table-updated-pubsub-event-func

Let’s review the function code before running it. Have a look at the main.py

All the logic is in the main function. Here we get log_entity we sent from the sink:

15
16
data_buffer = base64.b64decode(cloud_event.data["message"]["data"])
log_entry = json.loads(data_buffer)

In the second step we prepare the config

18
19
20
21
22
23
24
25
26
config["last_event_table"] = log_entry["protoPayload"]["serviceData"]\
["jobCompletedEvent"]["job"]["jobConfiguration"]["load"]\
["destinationTable"]["tableId"]

config["dataset_id"] = log_entry["protoPayload"]["serviceData"]\
["jobCompletedEvent"]["job"]["jobConfiguration"]\
["load"]["destinationTable"]["datasetId"]

config["tags"] = [config["dataset_id"]]

We get last_event_table, dataset_id from log_entry. As you notice we used the same “path” log_entry["protoPayload"]["serviceData"]["jobCompletedEvent"]["job"]["jobConfiguration"]["load"]["destinationTable"]["datasetId"] as we use in the sink filter:

Check sink filter

And the last step we create a new pubsub event:

22
23
24
25
26
27
client = pubsub_v1.PublisherClient()

topic_id = os.environ.get("topic_id")
topic_path = client.topic_path(config["project_id"], topic_id)

response = client.publish(topic_path, json.dumps(config).encode("utf-8"))

We get pubsub client, and generate the topic_path based on project_id and environment variable. And send the config variable to this topic.

The function expected these environment variables:

  • project_id
  • region
  • repository_id
  • git_commitish
  • dataform_run_topic

We already define all of them in the previous steps. So we are ready to publish the function, using gcloud functions deploy command with trigger-resource and trigger-event for the pubsub events generated by the sink. And pass all needed variables using set-env-vars parameter. Like this:

gcloud functions deploy ga4-table-updated-pubsub-event-func --max-instances 1 \ 
--entry-point main --runtime python39 --trigger-resource ${ga4_table_updated_topic} \
--trigger-event google.pubsub.topic.publish --region ${region} --timeout 540s \
--set-env-vars project_id=${project_id},region=${region},repository_id=${repository_id},git_commitish=${git_commitish},topic_id=${dataform_run_topic}

If it’s the first cloud function in your project you will see:

API [cloudfunctions.googleapis.com] not enabled on project [ID]. Would you like to enable and retry (this will take a few minutes)? (y/N)?

Type Y and wait for a few minutes while the function is deploying.

You could check the functions list, the new function should be green:

Cloud function to send pubsub event

One more important step we should grant our function permission to write in the pubsub topic.

First, we need a function’s service account. And again describe command in the rescue:

gcloud functions describe --format='value(serviceAccountEmail)' \
ga4-table-updated-pubsub-event-func

Set the environment variable:

export function_service_account=$(gcloud functions describe \ 
--format='value(serviceAccountEmail)' ga4-table-updated-pubsub-event-func)

And set the permissions as we did for the sink, but this time for the function’s service account:

gcloud pubsub topics add-iam-policy-binding ${dataform_run_topic} \ 
--member="serviceAccount:${function_service_account}" --role="roles/pubsub.publisher"

Open the dataform-run-topic page and check who has Pub/Sub Publisher role.

Check pubsub publisher role

Everything looks good, but unfortunately we can’t test this function as Google doesn’t update GA4 dataset.

Cloud function for Dataform run

Dataform API documentation (or its absence) makes this part the most interesting.

But thanks to open source the World is such a beautiful place to live. We can look inside the Airflow module for Dataform and use their code as a starting point, plus add a few missing features.

We will run Dataform in two steps:

  1. create_compilation_result
  2. create_workflow_invocation

In the first step, we pass:

  • git_commitish - the branch name, usually a main. This way we could do all our experiments in separate dev branches and push tested results in the main.
  • GA4_TABLE - a configuration variable with the updated table name.

In the second step, we invoke compilation results and pass the tags name equal to dataset_id. If you remember, later in definitions/staging/events/declarations.js file, we set tags:[“ga4_obfuscated_sample_ecommerce”]. As the tags are the same, we will execute all the models from the file.

If you have multiple GA4 datasets in one project, as soon as one of the GA4 tables is updated we will run Dataform models only for one project (based on the tags), and actions for other datasets won’t be executed.

Let’s review the code. Go to the second function folder:

cd ../ga4-table-updated-dataform-run-func/

In the main.py again first, we prepare the incoming event:

21
22
data = base64.b64decode(cloud_event.data["message"]["data"]).decode()
message = json.loads(data)

Then create compilation config (compilation_result), and set branch name and global variables from the event:

29
30
31
32
33
34
35
36
37
compilation_result = {
    "git_commitish": config.git_commitish,
}
if config.last_event_table:
    compilation_result["code_compilation_config"] = {
    "vars": {
            f"GA4_TABLE": config.last_event_table,
        }
    }

And create the compilation:

4
5
6
7
8
9
result = client.create_compilation_result(
    request={
        "parent": parent,
        "compilation_result": compilation_result,
    }
)

Further, we pass tags in the invocation config

53
54
55
workflow_invocation = {"compilation_result": result.name}
if config.tags:
    workflow_invocation["invocation_config"] = {"included_tags": config.tags}

And create invocation:

57
58
59
workflow_invocation_result = client.create_workflow_invocation(
    request=workflow_invocation_request,
)

Thanks to Airflow one more time.

And finally, we can deploy our function:

gcloud functions deploy ga4-table-updated-dataform-run-func --max-instances 1 \
--entry-point main --runtime python39 --trigger-resource ${dataform_run_topic} \
--trigger-event google.pubsub.topic.publish --region ${region} --timeout 540s

Again we use gcloud functions deploy command, and set ${dataform_run_topic} as trigger-resource.

Let’s check the list of functions one more time all functions should be green:

Cloud function to run dataform

But this time we could test our second function. Go to dataform-run-topic pubsub topic in the Messages tab and click Publish Message button:

Pubsub Publish Message

Put this object in the Message body (don’t forget to change <project_id> to your project):

{
    "project_id": "<project_id>",
    "region": "us-central1",
    "repository_id": "dataform-ga4events",
    "git_commitish": "main",
    "tags": ["ga4_obfuscated_sample_ecommerce"],
    "dataset_id": "ga4_obfuscated_sample_ecommerce",
    "last_event_table": "events_20210131"
}
Pubsub Publish Message

And let’s check the result, open the ga4-table-updated-dataform-run-func log tab

And the last log record should be something like this:

Function execution took 21865 ms, finished with status: ‘ok’

GCP Cloud Function Log

Then go to the Dataform workflow execution logs:

Dataform Execution Log

Here we can see the last execution from the source main with the tags ga4_obfuscated_sample_ecommerce but with an error status. If we click on this row we could find the reason:

Dataform Log Errors

Our intraday models returned errors because Google doesn’t provide intraday datasets. But if you test on actual data all the actions should be green.

Hourly run

The last exercise in this post, let’s run an intraday update every hour. We will create a scheduler job to send to pubsub dataform-run-topic messages like this:

{
    "project_id": "<project_id>",
    "region": "us-central1",
    "repository_id": "dataform-ga4events",
    "git_commitish": "main",
    "tags": ["ga4_hourly"]
}

As you see we don’t specify dataset_id and table_name, as we want to execute all GA4 models with ga4_hourly tag.

First define the message

export message_body="{\"project_id\":\"${project_id}\",
\"region\":\"${region}\",\"repository_id\": \"${repository_id}\",
\"git_commitish\": \"${git_commitish}\",\"tags\":[\"ga4_hourly\"]}"

We reuse existing variables and change the tags to ga4_hourly.

And next, create the job using gcloud scheduler jobs create pubsub command:

gcloud scheduler jobs create pubsub ga4-hourly --location ${region} \
--schedule "0 * * * *" --topic ${dataform_run_topic} --message-body ${message_body}

Here important parameters are:

If it’s your first scheduler job in the project you will get this warning

API [cloudscheduler.googleapis.com] not enabled on project []. Would you like to enable and retry (this will take a few minutes)? (y/N)? Type ‘Y’, and in a few moments the job will be created

Let’s check the job. Go to the cloud scheduler page you should see something like this:

Cloud Scheduler

And for testing click the three dots in the job’s row, and in the dropdown menu click on Force run command.

As usual, you could check cloud functions logs and Dataform Workflow Execution Logs:

Cloud Scheduler

You should see execution with ga4_hourly tags but again in error status, as we don’t have an intraday GA4 dataset.

Remember to pause or delete the scheduler job after all your tests.

Final thoughts

Thanks for reading, in this post we used a few core Google Cloud services and played a bit with gcloud CLI. And now we are ready to run our GA4 pipelines as soon as somebody decides to buy one more Google t-shirt.

Dataform ready to process GA4 events

The following post should be about monitoring, Slack and Teams notifications.

Please message me on LinkedIn if you have any questions or comments.

PS A few useful links: