We have experimented our Apache Beam pipeline on local machines. This is a good time to see how we can run Apache Beam project on Cloud.

Of course, the cloud is Google Cloud Platform. Yes we're talking Google Dataflow.


What is Google Dataflow?

Google Dataflow is a GCP service to run Apache Beam on provisioned VM instances as worker nodes. We can also configure the instances, how much RAM, CPU we need, networking and much more.

We can deploy our Beam pipeline to run on Dataflow by various methods. Also we are able to integrate the pipeline with other GCP services, such as processing files in Google Cloud Storage, receiving message from Google Pub/Sub, and write processed data into Google Cloud Firestore.

However, today we will see just a simple pipeline as the previous parts, read and write a simple file.


Runner

Core concept is here. When we are using Google Dataflow, we have to use its dedicate runner. It is DataflowRunner.

Using DataflowRunner, we would need to specify either in beam.Pipeline() or python -m command.

As well as we have to enable the Dataflow API beforehand.

Let's get to see examples.


Example 1: Direct from local

Let's start from a basic one.

Prepare files

Say we have this. The pipeline should produce outputs of only "F" people.

In run_beam(), there are GCS files we want to read and write (line 30-31). There we specify runner as DataflowRunner at line 37.

Run it

To make it run on Dataflow, we can execute this command in Terminal.

  • flag --region is a region of Dataflow job as we want.
  • flag --runner is a DataflowRunner here.
  • flag --project is a project of the job.
  • flag --temp_location is a GCS folder for storing temporary file generated by Dataflow job.

Check results

Wait for some time and we can see green lights like this.

You might notice the code has import csv inside the function mapToDict().

I had imported it at the top and got an error, so that I understand it's about the imported library is not available in the Dataflow worker nodes.

We can check if the files are ready.

And the content is correct.


Example 2: Container image

Because direct deployment above is good for such a small code, when it comes to bigger ones, that method is not working.

We consider using container image in order to pack source code in a place and easy to deploy.

Prepare files

Prepare folder structure like this.

A file CSVToDict

A file main.py. This time we want only "M" people.

And Dockerfile.

Now we have a blank requirements.txt because we don't have any external library to use at this time.

Prepare a container image

Now we have to build an image and upload it to Google Artifact Registry.

We need to create a repo in Google Artifact Registry first.

gcloud artifacts repositories create sample-beam \
   --repository-format=docker \
   --location=europe-west1 \
   --async

Run it

Then we can build and push the image and create a Beam pipeline based on the image.

  • flag --region is a region of Dataflow job as we want.
  • flag --runner is a DataflowRunner here.
  • flag --project is a project of the job.
  • flag --temp_location is a GCS folder for storing temporary file generated by Dataflow job.
  • flag experiments=use_runner_v2 enables Dataflow Runner V2.
  • flag sdk_container_image is an image of this Beam job.

Check results

The Dataflow job is green like this.

We would find the image of this pipeline by checking at "sdk_container_image" under "Pipeline options" in the bottom right.

And check the output file if it has only "M" people. Yes.

Read more about Google Artifact Registry here.

A private repo for our own Python packages
Google Artifact Registry is a service from Google to store an image, a module in Python, NodeJS, and much more

Example 3: Container image with parameters

For some reasons, we need to input file paths instead of hardcoding it.

We would consider Argparse, a library for managing inputs from command line.

Prepare files

We have updated main.py a bit to get file paths from input arguments (line 12-17) and select only whose "id" is odd (line 37).

Run it

Do the same, build and push an image then run Beam from that.

  • flag --region is a region of Dataflow job as we want.
  • flag --runner is a DataflowRunner here.
  • flag input_file and output_file are custom parameters.
  • flag --project is a project of the job.
  • flag --temp_location is a GCS folder for storing temporary file generated by Dataflow job.
  • flag experiments=use_runner_v2 enables Dataflow Runner V2.
  • flag sdk_container_image is an image of this Beam job.

Check results

Now we will see the Beam job run completely. As the same, we can spot "input_file" and "output_file" under "Pipeline options".

And the file is showing people with odd ids.

Read more about Argparse here.

argparse - next level Python parameterization
argparse is a module to help us dealing with inputs and flags. Besides, it helps handling tooltips and alerts.

Git Repo

You can check my repo of this part here.

sample-beam/04-dataflow at main · bluebirz/sample-beam
Contribute to bluebirz/sample-beam development by creating an account on GitHub.

References