Let's try: Apache Beam part 4 - live on Google Dataflow
This is a good time to see how we can run Apache Beam project on Cloud.
We have experimented our Apache Beam pipeline on local machines. This is a good time to see how we can run Apache Beam project on Cloud.
Of course, the cloud is Google Cloud Platform. Yes we’re talking Google Dataflow.
What is Google Dataflow?
Google Dataflow is a GCP service to run Apache Beam on provisioned VM instances as worker nodes. We can also configure the instances, how much RAM, CPU we need, networking and much more.
We can deploy our Beam pipeline to run on Dataflow by various methods. Also we are able to integrate the pipeline with other GCP services, such as processing files in Google Cloud Storage, receiving message from Google Pub/Sub, and write processed data into Google Cloud Firestore.
However, today we will see just a simple pipeline as the previous parts, read and write a simple file.
Runner
Core concept is here. When we are using Google Dataflow, we have to use its dedicate runner. It is DataflowRunner
.
Using DataflowRunner
, we would need to specify either in beam.Pipeline()
or python -m
command.
As well as we have to enable the Dataflow API beforehand.
Let’s get to see examples.
Example 1: Direct from local
Let’s start from a basic one.
Prepare files
Say we have this. The pipeline should produce outputs of only “F” people.
In run_beam()
, there are GCS files we want to read and write (line 30-31). There we specify runner as DataflowRunner at line 37.
Run it
To make it run on Dataflow, we can execute this command in Terminal.
- flag
--region
is a region of Dataflow job as we want. - flag
--runner
is aDataflowRunner
here. - flag
--project
is a project of the job. - flag
--temp_location
is a GCS folder for storing temporary file generated by Dataflow job.
Check results
Wait for some time and we can see green lights like this.
You might notice the code has import csv
inside the function mapToDict()
.
I had imported it at the top and got an error, so that I understand it’s about the imported library is not available in the Dataflow worker nodes.
We can check if the files are ready.
And the content is correct.
Example 2: Container image
Because direct deployment above is good for such a small code, when it comes to bigger ones, that method is not working.
We consider using container image in order to pack source code in a place and easy to deploy.
Prepare files
Prepare folder structure like this.
A file CSVToDict
.
A file “main.py”. This time we want only “M” people.
And Dockerfile.
Now we have a blank “requirements.txt” because we don’t have any external library to use at this time.
Prepare a container image
Now we have to build an image and upload it to Google Artifact Registry.
We need to create a repo in Google Artifact Registry first.
1
2
3
4
gcloud artifacts repositories create sample-beam \
--repository-format=docker \
--location=europe-west1 \
--async
Run it
Then we can build and push the image and create a Beam pipeline based on the image.
- flag
--region
is a region of Dataflow job as we want. - flag
--runner
is aDataflowRunner
here. - flag
--project
is a project of the job. - flag
--temp_location
is a GCS folder for storing temporary file generated by Dataflow job. - flag
experiments=use_runner_v2
enables Dataflow Runner V2. - flag
sdk_container_image
is an image of this Beam job.
Check results
The Dataflow job is green like this.
We would find the image of this pipeline by checking at “sdk_container_image” under “Pipeline options” in the bottom right.
And check the output file if it has only “M” people. Yes.
Read more about Google Artifact Registry here.
Example 3: Container image with parameters
For some reasons, we need to input file paths instead of hardcoding it.
We would consider Argparse
, a library for managing inputs from command line.
Prepare files
We have updated “main.py” a bit to get file paths from input arguments (line 12-17) and select only whose “id” is odd (line 37).
Run it
Do the same, build and push an image then run Beam from that.
- flag
--region
is a region of Dataflow job as we want. - flag
--runner
is aDataflowRunner
here. - flag
input_file
andoutput_file
are custom parameters. - flag
--project
is a project of the job. - flag
--temp_location
is a GCS folder for storing temporary file generated by Dataflow job. - flag
experiments=use_runner_v2
enables Dataflow Runner V2. - flag
sdk_container_image
is an image of this Beam job.
Check results
Now we will see the Beam job run completely. As the same, we can spot “input_file” and “output_file” under “Pipeline options”.
And the file is showing people with odd ids.
Read more about Argparse
here.
Repo
You can check my repo of this part here.