Apache beam มี transformation มาให้เราใช้หลายอย่างตั้งแต่แรกแล้วฮะ ดังนั้น สำหรับ blog นี้ เรามาดูกันว่า มี function ไหนน่าสนใจ เอาไปใช้กับงานของเราได้บ้างนะ


เตรียมของก่อน

ใน blog นี้เราจะใช้ CSV ข้อมูลคน 30 คนฮะ

เป็นข้อมูล ID, ชื่อ, เพศ, อาชีพ, ทีม, และอายุ

แล้วก็ reuse ของเก่า เป็น method transform CSV to dict ฮะ


ตัวอย่าง 1: สาวๆ อยู่ทีมไหน

เราอยากรู้ว่า แต่ละทีมมีผู้หญิงเป็นใครบ้าง

ลำดับความคิดของโจทย์นี้

  1. อ่าน file แล้ว beam.ParDo() เพื่อ transform เป็น dict
  2. ใช้ beam.Filter() เอาเฉพาะผู้หญิง
  3. beam.GroupBy() เพื่อ group ตาม "team"
  4. beam.Map() กับ function custom_print() เพื่อแสดงผลทีมและคนในทีม
  • beam.GroupBy() ต้องการ function เพื่อกำหนดว่า element นี้จะจัด group จาก property ไหนฮะ

ตัวอย่าง 2: หญิงเท่าไหร่ ชายกี่คน

เราอยากรู้ว่า ใน list อันนี้ มีผู้ชายและผู้หญิงอย่างละกี่คนกันนะ

ลำดับความคิดของโจทย์นี้

  1. อ่าน file แล้ว beam.ParDo() เพื่อ transform เป็น dict
  2. Group จากเพศด้วย beam.Partition()
  3. สำหรับกลุ่มผู้ชาย นับจำนวนด้วยbeam.combiners.Count.Globally() แล้วแสดงผล
  4. ทำเหมือนกันกับกลุ่มผู้หญิง
  • beam.Partition() ต้องการ function เพื่อกำหนดตัวเลขให้แต่ละ element ว่าจะไปอยู่ใน partition ไหน
    สำหรับเคสนี้ เราให้ตัวเลขอิงจาก index ของตัวแปรเพศ นั่นก็คือ genders.index()
  • beam.combiners.Count.Globally() คืนค่าเป็นตัวเลขจำนวน element ใน PCollection

เราใช้ branching ในตัวอย่างนี้ ซึ่งทำได้โดยใส่วงเล็บแยกกับ PCollection ที่เป็นผลลัพท์จาก block ก่อนหน้า

นั่นคือ male_people กับ female_people PCollection จาก PTransform ก่อนหน้าเนี่ย เราใช้มันเป็น PCollection เริ่มต้นของ block ต่อไปฮะ

วาด DAG ได้แบบนี้นะ

วิธีสร้าง DAG graph แบบนี้ หาอ่านได้ที่ตอนที่ 2 นะฮะ


ตัวอย่าง 3: อาชีพไหนฮิต

เราอยากรู้ว่า อาชีพไหน มีจำนวนเท่าไหร่ โดยเอาแค่ชื่ออาชีพแค่คำเดียว

ลำดับความคิดของโจทย์นี้

  1. อ่าน file แล้ว beam.ParDo() เพื่อ transform เป็น dict
  2. จากค่า "อาชีพ" ให้แยก (split) คำย่อยๆ ด้วย beam.FlatMap() และ str.split()
  3. เลือกแค่ตัวอักษรและตัวเลข (alphanumeric) ด้วย beam.Regex.replace_all()
  4. แปลงเป็นตัวพิมพ์เล็กด้วย beam.Map()
  5. คัดคำที่มีความหมายเป็นอาชีพ ด้วยbeam.Regex.matches()
  6. Group และนับด้วย beam.combiners.Count.PerElement()
  1. beam.FlatMap() รับ function ที่คืนค่าเป็น iterable แล้วแปลงเป็น PCollection
    จากตัวอย่าง เรามี string "Administrator, charities/voluntary organisations" แล้วแยกเป็น ["Administrator", "charities/voluntary", "organisations"] มันก็จะกลายเป็น PCollection
  2. beam.Regex.replace_all() ใช้ Regex เพื่อแทนที่ทุกคำในข้อความต้นฉบับให้เป็นค่าที่ต้องการ
    ในตัวอย่างนี้ เราแทนที่ทุกอักขระที่ไม่ใช่ตัวเลขหรือตัวอักษร (non-alphanumeric) ด้วย regex r"[^\w\d]" ให้เป็น empty string ผลลัพท์ก็เหมือนเราลบค่าพวกนั้นทิ้งไปฮะ
  3. beam.Regex.matches() เลือก element ด้วย Regex
    ตัวอย่างนี้เราต้องการชื่ออาชีพ เลยคัดที่ขึ้นต้นด้วยตัวอักษร 4 ตัวแรก แล้วจบด้วย "ist", "er", "or", "ian", หรือ "ant" ฮะ
  4. beam.combiners.Count.PerElement() คืนค่าเป็น PCollection ของ element ที่ไม่ซ้ำกัน (unique) พร้อมนับจำนวนให้ (number of occurrences)

มีบล็อกเกี่ยวกับ Regex หากสนใจฮะ

REGEX ไม่เล็กนะครับ
Regular Expression ถูกใช้เป็นเครื่องมือตรวจสอบและแก้ไขข้อความที่ใช้เป็นพื้นฐานสำหรับทุกภาษาโปรแกรมมิ่งเลยล่ะฮะ

ตัวอย่าง 4: ผู้อาวุโสประจำกลุ่ม

เราอยากรู้ว่า ในแต่ละทีม ใครแก่สุด แยกชายหญิง

ลำดับความคิดของโจทย์นี้

  1. อ่าน file แล้ว beam.ParDo() เพื่อ transform เป็น dict
  2. แปลงเป็น tuple ด้วย beam.Map() ใน format (team-gender, age)
  3. Group ด้วย beam.CombinePerKey() และให้ function max เพื่อหา age ที่มากที่สุด
  • beam.CombinePerKey() ต้องการ PCollection ที่มี key และ value
    ในตัวอย่างนี้ เรากำหนด key เป็น team-gender และให้ value คือ age จากนั้นตัว function จะ group key เข้าด้วยกัน แล้วคำนวณหา max ของ value จะได้เป็น unique keys ที่มี max ของ age นั่นเองฮะ

Repo

ตัวอย่างทั้งหมด มีใน repo ตามลิงก์ด้านล่างนี้ฮะ

sample-beam/05-beam-transform at main · bluebirz/sample-beam
Contribute to bluebirz/sample-beam development by creating an account on GitHub.

Reference

Python transform catalog overview
Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the…