3 part ก่อนหน้านี้ เรา run Apache Beam pipeline บนเครื่องของเราเอง ทีนี้ เรามาลอง run บน Cloud กันบ้างฮะ แน่นอน ก็ต้องเป็น Google Cloud Platform นี่แหละ ซึ่งเราจะได้ใช้ Google Dataflow กัน


Google Dataflow คืออะไร

Google Dataflow เป็น service นึงของ GCP ที่สามารถทำงานกับ Apache Beam ได้ โดยมันจะสร้าง VM instance มาเป็น worker node เพื่อ run pipeline ที่เขียนเอาไว้ แล้วเราก็สามารถกำหนดรายละเอียดได้อีกว่า จะเอา RAM เท่าไหร่ CPU กี่ตัว เชื่อมต่อ network ยังไง และอีกเพียบฮะ

ตอนที่เราจะ deploy Beam pipeline ไปที่ Dataflow เนี่ย ก็มีหลายวิธีฮะ รวมถึงจะ integrate กับ GCP service อื่นๆ เช่น process files ใน Google Cloud Storage หรือรับ message จาก Google Pub/Sub หรือ ให้เขียนข้อมูลลง Google Cloud Firestore ก็ทำได้จบในตัวฮะ

สำหรับ part นี้ เราทำแบบง่ายๆ ให้พอเข้าใจกันก่อน โดยเราจะยก part ที่แล้ว ก็คือ อ่านและเขียน file นั่นแหละ แต่ให้ Dataflow เป็นคนทำแทน


Runner

Core concept อยู่ตรงนี้ ถ้าเราจะใช้ Google Dataflow เราต้องกำหนด runner ให้เป็น DataflowRunner ซึ่งกำหนดใน beam.Pipeline() หรือ python -m command ก็ได้ฮะ

แต่ก่อนอื่นเราต้องเปิดใช้ Dataflow API ก่อนนะ

เอาล่ะ เปิดแล้ว ก็ไปดูตัวอย่างกัน


ตัวอย่าง 1: สั่งตรงจาก local

อันนี้เบสิคสุด

เตรียม file

เราให้ pipeline ของเราหยิบมาแต่คนที่เป็นเพศ "F"

จะเห็นว่าใน function run_beam() เรากำหนด GCS file ที่จะอ่านและเขียนเอาไว้ที่บรรทัด 30-31 และก็กำหนด runner DataflowRunner ไว้ที่บรรทัด 37

สั่ง run

พอจะ run ที่ Dataflow ให้เราสั่งผ่าน Terminal

  • flag --region คือ region ของ Dataflow job
  • flag --runner ก็คือ DataflowRunner แต่ไม่ต้องใส่ก็ได้ เพราะเรากำหนดใน code แล้ว
  • flag --project คือ project ของ job
  • flag --temp_location คือ GCS folder เอาไว้เก็บ temporary file ที่ถูกสร้างโดย Dataflow job

ดูผลลัพท์

รอแปบนึง ก็จะเห็นสีเขียวๆ บอกว่า job เราทำงานเสร็จสิ้นฮะ

สังเกตมั้ย ว่าใน code มี import csv อยู่ข้างใน function mapToDict()

เคยให้ import ไว้บนสุด และเจอ error แล้วผมเข้าใจว่ามันเกิดจาก library ที่ import ไปนั้นมันไม่มีอยู่ใน worker node ทำให้ต้อง import ใน function ฮะ

file มาครบแล้ว

ข้อมูลถูกต้อง ผ่านฮะ


ตัวอย่าง 2: Container image

วิธีแรกมันง่าย แต่ใช้ได้กับพวก code เล็กๆ จบใน file เดียว ถ้ามีหลาย file เป็น code ขนาดใหญ่ขึ้นมาล่ะ

เราใช้ container image เพื่อรวม source code ไว้เป็นก้อนเดียว และนี่ก็เป็นข้อดีของ image ด้วยฮะ

เตรียม file

folder structure แบบนี้ฮะ

file CSVToDict

file main.py คราวนี้ เราเอาแค่คนที่เป็นเพศ "M" เท่านั้น

และ Dockerfile.

รอบนี้เรามี requirements.txt เป็น file เปล่าๆ โล่งๆ เพราะไม่ต้องการ library ใดๆ ฮะ

เตรียม container image

ทีนี้ เราก็มาเตรียม image ของเรา แล้วเอาไปไว้ที่ Google Artifact Registry.

ก่อนอื่นก็ต้องสร้าง repo ใน Google Artifact Registry

gcloud artifacts repositories create sample-beam \
   --repository-format=docker \
   --location=europe-west1 \
   --async

สั่ง run

มี repo พร้อมแล้วง เราก็สามารถ build และ push image ก่อนจะสร้าง Beam pipeline จาก image

  • flag --region คือ region ของ Dataflow job
  • flag --runner ก็คือ DataflowRunner แต่ไม่ต้องใส่ก็ได้ เพราะเรากำหนดใน code แล้ว
  • flag --project คือ project ของ job
  • flag --temp_location คือ GCS folder เอาไว้เก็บ temporary file ที่ถูกสร้างโดย Dataflow job
  • flag experiments=use_runner_v2 หมายถึง เราเปิดใช้ Dataflow Runner V2
  • flag sdk_container_image คือ ที่อยู่ของ image ที่เราใช้ทำ this Beam job

ดูผลลัพท์

หน้าตาเขียวๆ แบบนี้แหละดีแล้ว

ที่มุมล่างขวา จะเห็น image ตรง "sdk_container_image" ข้างใต้ "Pipeline options" นะฮะ เอาไว้ตรวจสอบทีหลังได้แหละ

ผลลัพท์ที่ต้องการคือ คนเพศ "M" เท่านั้น ผ่านล่ะ

ถ้าอยากอ่านเกี่ยวกับ Google Artifact Registry เพิ่มเติม ดูได้ตามลิงก์นี้ฮะ

Python package ของเราก็เก็บไว้ที่ repo ของเราเอง
Google Artifact Registry เป็น service ของ Google มีไว้เพื่อเก็บ docker image หรือ packge ในภาษา Python รวมถึง NodeJS และอีกหลายแบบ

ตัวอย่าง 3: Container image พร้อมรับ parameter

บางที เราก็อยากให้รับ parameter แทนที่จะ hard-code มัน ก็สามารถทำได้ โดยใช้ Argparse ฮะ ซึ่งเป็น library สำหรับจัดการ input จาก command line

เตรียม file

ที่ main.py ของเดิม เราแก้ให้รับ file path argument (บรรทัด 12-17) และขอปรับนิดนึงให้เลือกเฉพาะคนที่มี "id" เป็นเลขคี่ (บรรทัด 37)

สั่ง run

ทำเหมือนเดิมเลย แต่มี parameter เพิ่มเข้าไปอีกสองตัวนะฮะ

  • flag --region คือ region ของ Dataflow job
  • flag --runner ก็คือ DataflowRunner แต่ไม่ต้องใส่ก็ได้ เพราะเรากำหนดใน code แล้ว
  • flag --project คือ project ของ job
  • flag input_file และ output_file ก็เป็น parameter ของ image ตัวนี้
  • flag --temp_location คือ GCS folder เอาไว้เก็บ temporary file ที่ถูกสร้างโดย Dataflow job
  • flag experiments=use_runner_v2 หมายถึง เราเปิดใช้ Dataflow Runner V2
  • flag sdk_container_image คือ ที่อยู่ของ image ที่เราใช้ทำ this Beam job

ดูผลลัพท์

ผ่านเรียบร้อย เขียวอื๋อ สังเกตว่า ที่ "Pipeline options" มีค่าของ "input_file" กับ "output_file" แสดงอยู่นะ

file ผลลัพท์ก็แสดงคนที่มี id เลขคี่ออกมาถูกต้องด้วยแหละ

สำหรับ Argparse สามารถไปตามลิงก์นี้ อ่านเพิ่มเติมได้ฮะ

argparse - โยน parameter เข้า Python แบบ next level
argparse เป็น module จัดการ parameter กับ flag ที่เล่าๆ ไว้ข้างต้น รวมถึงช่วยทำ help และ error handling แบบเบื้องต้นให้ด้วยนะ

Git Repo

แปะลิงก์ github ของ part นี้เอาไว้ข้างล่างเลยฮะ

sample-beam/04-dataflow at main · bluebirz/sample-beam
Contribute to bluebirz/sample-beam development by creating an account on GitHub.

References