Apache Beam มี package หลายตัวให้เราเอาไว้จัดการ input/output ซึ่งเราก็แค่ import แล้วเรียกใช้งานมันให้ถูกต้องก็พอแล้ว

ใน blog นี้ ผมขอเล่าถึง 3 IO (input/output) module ที่ได้ใช้บ่อยๆ กันฮะ


1. Text (Google Cloud Storage)

basic สุดละ กับพวก text file เนี่ย

Beam มี beam.io library ที่เราน่าจะคุ้นกันอยู่แล้ว และข้างในก็มี function ReadFromText() และ WriteToText() เพื่ออ่านและเขียน text file

สามารถใช้อ่านเขียน file ใน Google Cloud Storage ได้เหมือนกันฮะ เพราะมันก็คือ  text file เหมือนกันนั่นแหละ

  • บรรทัด 14: ReadFromText() เพื่ออ่าน input_file ที่เรารับมาจาก argparse
  • บรรทัด 16: WriteToText() เพื่อเขียนไปที่ output_file

pipeline นี้เอามาวาด diagram ได้ประมาณนี้ฮะ


2. Database (Google BigQuery)

ถัดจาก Google Cloud Storage ก็เป็น Google BigQuery ที่ผมใช้บ่อยไม่แพ้กันฮะ

  • บรรทัด 14: ReadFromBigQuery() แล้วส่งค่าที่ query= เพื่อให้มัน query ตามที่กำหนด
  • บรรทัด 18: เติม temp_dataset= ให้ Beam เขียน temporary data ที่มันจะ auto-generate ไปไว้ใน dataset นี้นะ
ℹ️
ถ้าเราไม่กำหนด temp_dataset=, Beam จะสร้าง dataset ใหม่ขึ้นมาเรื่อยๆ ทุกครั้งที่เราสั่ง run เลยฮะ
Beam generate temporary dataset แบบอัตโนมัติ

จาก pipeline ได้ diagram ประมาณนี้ฮะ


3. Messaging (Google Cloud Pub/Sub)

ทีนี้ มาที่ real-time กันบ้างฮะ

Google Cloud Pub/Sub ก็สามารถ integrate กับ Beam ได้ง่ายๆ เหมือนกันฮะ ซึ่งเรากำหนดได้ว่าจะไปต่อกับ publisher หรือ subscriber แล้วแต่การออกแบบ โดยตัวอย่างนี้ ผมจะให้ Beam ไปอ่านข้อมูลจาก subscriber ก่อนทำ transform แล้วส่งไปที่ topic

  • บรรทัด 10: set option streaming=True ให้ Beam run แบบ streaming pipeline ฮะ
  • บรรทัด 15: ReadFromPubSub() ให้ไปอ่านค่าจาก subscriber ที่กำหนด
  • บรรทัด 26: หลัง transform ก็ส่งค่าไปที่ topic ด้วย WriteToPubSub()
ℹ️
ตอนเรียก WriteToPubSub() ให้แปลง PCollection เป็น byte string ด้วยนะฮะ เช่น การใช้.encode()

เรามาลอง publish ข้อความลง topic แล้ว pull จาก subscription ปลายทางดูกันฮะ

สังเกตว่า originMessageId มีค่าตรงกัน เนื่องจากเรา parse มาจาก topic ในตอนที่ทำ transform ฮะ

Beam pipeline ตัวอย่างนี้ ทำงานประมาณนี้ฮะ


Repository

sample-beam/06-io at main · bluebirz/sample-beam
Contribute to bluebirz/sample-beam development by creating an account on GitHub.

References