DEV Community

Weerasak Chongnguluam
Weerasak Chongnguluam

Posted on

ใช้ Stream.unfold และ Enum.reduce แทน loop ในแบบ imperative

งานที่ทำวันนี้ต้องการเขียนโค้ดเพื่อ request API ที่เป็น pagination ซึ่งถ้าเราต้องการเอาข้อมูลทั้งหมด เราก็ต้อง request ไปทีละ page จนกว่าจะถึง page สุดท้าย

ทีนี้ถ้าเขียนแบบ imperative ที่มี loop ทั่วไปก็จะออกมาประมาณนี้ (ป.ล. โค้ดเป็น pseudo code)

result = []
page = 1
limit = 10
loop do
  resp = fetch(page: page, limit: limit)
  result = result ++ resp.list
  if page * limit >= resp.total do
    break
  end
  page = page + 1
end
Enter fullscreen mode Exit fullscreen mode

คือดึงข้อมูลจนกว่า page * limit จะมากกว่า total นั่นเอง

ทีนี้ถ้าเราจะทำในลักษณะนี้ใน Elixir เราสามารถเขียน recursive function เองก็ได้แล้วให้มี accumulator parameter ในการรวบรวม result ในแต่ละรอบที่เรียกซ้ำฟังก์ชัน

แต่ก็คิดว่า Elixir น่าจะมี function ที่ช่วยให้เราไม่ต้องเขียนเองเอาไว้แล้ว จนค้นไปเจอว่า Stream.unfold ใช้ร่วมกับ Enum.reduce นั่นสามารถทำได้

Stream คือ package ที่ช่วยให้เราจัดการกับข้อมูลที่ค่อยๆถูกสร้าง หรือค่อยๆถูกดึงออกมาโดยไม่ต้องมาเป็นก้อนใหญ่ๆก้อนเดียว ซึ่ง Stream.unfold นั้นก็ช่วยให้เราสร้าง Stream ของผลลัพธ์จากการเรียกฟังก์ชันใดๆ ตัวอย่างเช่นเราอยากสร้าง Stream ของการ fetch ข้อมูลทีละหน้าแบบด้านบนเราสามารถเขียนได้แบบนี้

Stream.unfold([start_page: 1, limit: 10], fn 
  [page: page, limit: limit] ->
    fetch(page, limit)
    |> case do
      %{list: list, total: total}
      when page * limit >= total ->
        {list, :halt}

      %{list: list} ->
        {list, [page: page + 1, limit: limit]}
    end

  :halt ->
    nil
end)
Enter fullscreen mode Exit fullscreen mode

จากโค้ดจะเห็นว่าเมื่อมีการทำให้ค่าจาก Stream emit ออกมานั้นจะเริ่มเอาค่าเริ่มต้นแรก [start_page: 1, limit: 10] ไปเรียก function ที่เราส่งให้กับ Stream.unfold ซึ่งเราก็จะเอาไปเรียก fetch(page, limit) จากนั้นก็เอาผลลัพธ์ที่ได้มาเช็คว่าถ้า page*limit >= total เราจะตอบกลับเป็น tuple ที่ค่าแรกเป็นผลลัพธ์ที่จะ emit ออกไปในครั้งนี้ และ ค่าที่สองคือ parameter ที่จะเอาไปเรียกฟังก์ชันในครั้งถัดไป ซึ่งเราก็กำหนดเป็น :halt เพื่อที่จะได้ไป match กับอีก case แล้วก็จะ return nil เพื่อบอก Stream.unfold ให้จบ stream เหมือนกับ break ใน imperative ที่เราเขียนให้ดูนั่นเอง ถ้าไม่เป็นตามเงื่อนไข เราก็ emit list ออกไปพร้อมกับส่ง [page: page+1, limit: limit] เป็น parameter ในรอบถัดไปเพื่อให้ fetch page ถัดไปนั่นเอง

การเรียกฟังก์ชันนี้จะยังไม่เกิดการเรียก fetch(page, limit) ทันทีเพราะมันจะแค่สร้าง Stream เตรียมเอาไว้จนกว่าเราจะเอา Stream ไปจัดการด้วยฟังก์ชันของ Enumerator อย่างในกรณีนี้เราต้องการรวบรวม ค่าของ list ที่ถูก emit ออกมาให้เป็น list เดียว เราเลยจะใช้ Enum.reduce ช่วย แล้วก็เอามา pipe ต่อกันได้แบบนี้

Stream.unfold([start_page: 1, limit: 10], fn 
  [page: page, limit: limit] ->
    fetch(page, limit)
    |> case do
      %{list: list, total: total}
      when page * limit >= total ->
        {list, :halt}

      %{list: list} ->
        {list, [page: page + 1, limit: limit]}
    end

  :halt ->
    nil
end)
|> Enum.reduce([], fn list, acc -> acc ++ list end)
Enter fullscreen mode Exit fullscreen mode

ทีนี้ถ้าฟังก์ชัน fetch ของเรานั้นมี error กลับออกมาด้วย โดยถ้าเรียกสำเร็จจะได้ {:ok, %{list: list, total: total}} ถ้าไม่สำเร็จจะได้ {:error, reason} แบบนี้ เราสามารถใช้ Enum.reduce_while เพื่อช่วย reduce จนกว่าจะเจอ pattern ที่เป็น error ได้ โดยในส่วนของ Stream.unfold ถ้าเราเจอ error ตอน fetch ก็ให้จบ stream เช่นกัน เขียนได้แบบนี้

Stream.unfold([start_page: 1, limit: 10], fn 
  [page: page, limit: limit] ->
    fetch(page, limit)
    |> case do
      {:ok, %{list: list, total: total}}
      when page * limit >= total ->
        {list, :halt}

      {:ok, %{list: list}} ->
        {list, [page: page + 1, limit: limit]}
      {:error, _reason} = error ->
        {error, :halt}
    end

  :halt ->
    nil
end)
|> Enum.reduce_while({:ok, []}, fn 
  {:ok, next_list}, {:ok, list} -> {:cont, {:ok, list ++ next_list}}
  {:error, _reason} = error, _acc -> {:halt, error}
end)
Enter fullscreen mode Exit fullscreen mode

คือถ้า fetch แล้วได้ error ก็จะ emit error เป็นค่าสุดท้ายแล้วรอบถัดไปก็ :halt จบ stream

ส่วน Enum.reduce_while นั้นตรงผลลัพธ์ของฟังก์ชันนั้นต้องตอบกลับเป็น tuple ถ้าจะให้จบ reduce tuple ค่าแรกต้องเป็น :halt แต่ถ้าจะต่อไปให้เป็น :cont ส่วนค่าที่สองใน tuple คือค่าที่เราจะให้เป็น accumulator ในแต่ละรอบ ส่วนถ้า stream ไม่มี error เลยก็จะ reduce จนจบ stream นั่นเอง

ขอฝาก Buy Me a Coffee

สำหรับท่านใดที่อ่านแล้วชอบโพสต์ต่างๆของผมที่นี่ ต้องการสนับสนุนค่ากาแฟเล็กๆน้อยๆ สามารถสนับสนุนผมได้ผ่านทาง Buy Me a Coffee คลิ๊กที่รูปด้านล่างนี้ได้เลยครับ

Buy Me A Coffee

ส่วนท่านใดไม่สะดวกใช้บัตรเครดิต หรือ Paypal สามารถสนับสนุนผมได้ผ่านทาง PromptPay โดยดู QR Code ได้จากโพสต์ที่พินเอาไว้ได้ที่ Page DevDose ครับ https://web.facebook.com/devdoseth

ขอบคุณครับ 🙏

Top comments (0)