We sometimes have to apply some complex conditions in our Beam pipeline. This blog we will get along together to see how can we design those complex ideas into a simple-readable yet powerful workflow.


Quest today

We are finding out preparing these books in the CSV:

However, we also have a list of banned books' ISBN:

We have to group the books into one of these:

  • "BANNED": any book in the ban list and is published after 1970 will be in this group. They will be sent to "Midnight library". Any that is published before or in 1970 will be unbanned.
  • "ANTIQUE": any book which is published until 1970 will be in this group and will be sent to "Archive common library".
  • "MODERATE": any book which is published in between 1971 and 2017 will be in this list and will be sent to "Central library".
  • "MODERN": any book which is published since 2018 will be in this group and will be sent to "New Bloom library".

Side inputs

First thing first, we are able to tell which book is in the ban list. One solution in order to add ban status into the book list is "Side inputs".

Side inputs are additional inputs we directly added into PTransform.

With side inputs, we are enabling a function to get more parameters. For example:

We are adding new key is_banned if the book's ISBN is in the banned_book_list parameter.

To call this DoFn, there are 2 ways.

  1. Prepare an input before the pipeline
  1. Prepare an input inside the pipeline
✳️
In case of preparing inside, side inputs cannot be PCollection so that we have to use beam.pvalue.AsIter() for many values (e.g. a list) or beam.pvalue.AsSingleton() for single value (e.g. integer).

After the transformation with these side inputs, we are able to see the output like this:


Tagging

Now we have a ban flag in the book list. Next is to group or classify each book into each group.

It is the time to utilize tagging feature. This feature allows us to tag a value in each element of the PCollection.

We apply the logic of grouping here and return transformed element and particular  tag in each if-clause in the formatyield beam.pvalue.TaggedOutput(self.<tag>, element).

Tagging DoFn is ready and call it like this:

Look at line 12-16, adding beam.ParDo(<DoFn>).with_outputs(<tags>) is executing that DoFn and get elements with tags in return.

Line 21 is that we can select a tag to perform some PTransformation by refering as a list, PCollection[<tag>].

Example element after this step would be like this:


Write to files

Applying side inputs and tagging, it's ready to print out to files.

Example "BANNED" books:

Example "MODERN" books:


Repo

sample-beam/08-tag-n-side-inputs at main · bluebirz/sample-beam
Contribute to bluebirz/sample-beam development by creating an account on GitHub.

References