ในงานจริง เรามักจะต้องทำ transformation ซับซ้อน แบบที่เขียน function ง่ายๆ ไม่กี่บรรทัดคงไม่ตอบโจทย์ฮะ และให้ code ของเรามันอ่านง่าย เป็นระบบระเบียบเรียบร้อยด้วย ดังนั้น blog นี้เราเลยมาคุยกันถึงการเขียน function และ classe กัน

แล้วใน Apache Beam เค้าเขียนกันยังไงน่ะ?


ParDo

ParDo เป็น function ใน Apache Beam ที่เอาไว้สั่ง execute transformation class ฮะ ด้วย syntax เริ่มต้นง่ายๆ กันก่อนเลย แบบนี้

Custom class

ที่ transformation class ของเรา ให้มัน inherits beam.DoFn ก่อนนะฮะ จากนั้นก็ต้องมี function process() อันนี้คือต้องมีเลยนะ เพราะ Beam จะไปเรียกใช้มันแน่นอน แบบอัตโนมัติด้วย

มันยังมี function อื่นอีก เดี๋ยวเราค่อยๆ ทำความรู้จักไปฮะ

Main script

ทีนี้ในส่วนของ main เราก็เรียกใช้ด้วย beam.ParDo(<class name>())


ตัวอย่างแรก

เรามา reuse ของเก่ากันฮะ

เราอยาก run Beam เพื่อ transform CSV เหมือนครั้งที่แล้วแหละ แต่รอบนี้ เราจะแยกส่วนออกมาเป็น transformation class

ตอนนี้เรามี class CSVToDictFn ที่ inherit beam.DoFn มาไว้เรียบร้อย ตาม code ข้างล่างนี้เลยฮะ

กลับไปที่ main ก็จะเรียกใช้ class แบบนี้

ซึ่งเวลา run ก็คือ มันจะไปเรียก function process เองเลยฮะ ทำให้ได้ผลลัพท์แบบรูปข้างล่างนี้

อย่างนึงที่ควรรู้ คือ beam.ParDo จะ accumulate ผลลัพท์ ดังนั้น เราควรใช้ yield เพราะ yield จะคืนค่าเป็น iterator ของแต่ละ element ในขณะที่ return จะคืนค่าเป็น iterator ของทุก elements

ถ้าเราใช้ return ผลลัพท์จะไม่ถูกต้องฮะ เช่นอันนี้ มันแสดงผลลัพท์เป็น key ของ dictionary เท่านั้นเลย


ตัวอย่าง 2 ที่ใช้ parameter

คราวนี้ ลอง advance อีกนิด ด้วยการที่เราจะ parse parameter ไปด้วย ตรงนี้ เราจำเป็นต้อง implement __init__()

ตอนนี้ class CSVToDictFn ของเรามี __init__() เพื่อรับตัวแปร schema สำหรับแปลงค่า CSV ให้เป็น type ต่างๆ กัน

ตอนเรียกใช้ ก็เขียนแบบนี้ฮะ

เราเตรียม schema ไว้ที่บรรทัด 8 และเรียกใช้ class ที่บรรทัด 23 พร้อมกับ parse schema ไปพร้อมกัน

logic ของ CSVToDictFn อันนี้ คือ เราจะมีค่าอยู่ 3 ค่า ได้แก่ field name, field type, และค่าจาก CSV จากนั้นเราจะใช้ zip เพื่อ map เป็น dictionary

ตรงนี้ วาดเป็น diagram ได้ประมาณนี้ฮะ