Apache beam has many transformations out of the box. This blog we are going to see what functions we can try on our problems, and it may be useful for us not to waste time for.


Preparations

This blog we will use the CSV file of 30 people. This.

It contains ID, name, gender, occupation, team, and age.

Also prepare a same method to transform a CSV to dict.


Example 1: Women in teams

We want to see list of female in each team.

Steps of thought:

  1. Read a file and beam.ParDo() to transform it to a dict.
  2. Perform beam.Filter() to get only female.
  3. beam.GroupBy() to group them based on "team".
  4. beam.Map() with the function custom_print() which displays team and people in the team.
  • beam.GroupBy() requires a function to determine what property in an element is a key to be grouped.

Example 2: How many men and women?

We want to see number of men and women in the list.

Steps of thought:

  1. Read a file and beam.ParDo() to transform it to a dict.
  2. Group them based on "gender" using beam.Partition().
  3. For male, count the male group with beam.combiners.Count.Globally() and print out.
  4. For female, do the same.
  • beam.Partition() requires a function to determine a number of each partition.
    In this case, we give the number by index via list.index(), it was genders.index().
  • beam.combiners.Count.Globally() returns a number of elements in the PCollection.

We perform branching here by applying parentheses with different PCollections from the previous step.

As we can see, we have male_people and female_people PCollections after the 2nd PTransform. After that we use them as initial PCollection of the later blocks.

This can be rendered like this.

See how to generate a DAG as above at part 2.


Example 3: Numbers of occupation

We want to see list of single word occupation and number of occurrence.

Steps of thought:

  1. Read a file and beam.ParDo() to transform it to a dict.
  2. Split "occupation" into words with beam.FlatMap() and str.split().
  3. Cleanse all non-alphanumeric with beam.Regex.replace_all().
  4. Transform to lowercase with beam.Map().
  5. Retrieve only occupation vocabularies with beam.Regex.matches().
  6. Group and count number of occurrence using beam.combiners.Count.PerElement().
  • beam.FlatMap() accepts a function that returns iterable then transform it to PCollection.
    For example, we have a string "Administrator, charities/voluntary organisations". Then split to ["Administrator", "charities/voluntary", "organisations"]. This list will become a PCollection.
  • beam.Regex.replace_all() replace all occurrence in an element to a given string, validated by a Regex.
    In this case we replace all non-alphanumeric ( r"[^\w\d]") to an empty string, implying remove it.
  • beam.Regex.matches() filters elements based on a Regex.
    In this case we accepts words starting with first 4 letters or more and ends with "ist", "er", "or", "ian", or "ant".
  • beam.combiners.Count.PerElement() returns a PCollection of unique elements with their number of occurrences.

Read more about Regex, follow the link below.

REGEX is sexy
Regular Expression is a tool for processing text. It’s popular in most programming languages.

Example 4: The oldest in a group

We want to see how old of the oldest per team per gender.

Steps of thought:

  1. Read a file and beam.ParDo() to transform it to a dict.
  2. Transform with beam.Map() to tuples in format (team-gender, age).
  3. Group with beam.CombinePerKey() and supply max to return max age of each group.
  • beam.CombinePerKey() requires a PCollection having a key and value.
    In this case we have key as team-gender and the value as age, the function will group keys together and execute max on the value, then return unique keys with max of age.

Repo

I have concluded source code in the repo below.

sample-beam/05-beam-transform at main · bluebirz/sample-beam
Contribute to bluebirz/sample-beam development by creating an account on GitHub.

Reference

Python transform catalog overview
Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the…