Best Practice Dalam Menggunakan Celery

Muhammad Yunus
4 min readMay 23, 2020

--

Asynchronous task, Postponed Task, multiple Queue, Data Chunk, Task Group, Task Priority, Error Handling, Distributed Lock, Monitoring & Observability

Hai semuanya, setelah sebeumnya kita bahas konsep kerja dan arsitektur Celery disini,

Sekarang akan kita bahas best practice dalam menggunakan Celery untuk usecase yang akan kita hadapi. Jika belum coba install Celery, kalian bisa ikuti pada tutorial berikut,

Berikut adalah outline yang akan kita bahas dalam tulisan ini,

  • Asynchronous task,
  • Postponed Task,
  • Multiple Queue,
  • Data Chunk,
  • Task Group,
  • Task Priority,

Asynchronous Task

Celery didesain untuk menjalankan task secara asynchronous dengan menggunakan method .delay() ataupun .apply_async().

.apply_async() , mengirimkan task message dan support execution option, seperti eta , countdown , sampai expire yang dapat digunakan untuk mengatur kapan dan bagaimana task bekerja.

.delay() , mengirimkan task message tanpa support execution.

untuk percobaan awal kita dapat menggunakan delay() untuk kemudahan. Namun dalam usecase sebenarnya, disaraknan menggunakan apply_async() .

Varian penggunaan .delay() :

  • T.delay(arg, kwarg=value)

Varian penggunaan apply_async() :

  • T.apply_async((arg,), {'kwarg': value})
  • T.apply_async(countdown=10.
  • T.apply_async(eta=now + timedelta(seconds=10))
  • T.apply_async(countdown=60, expires=120)
  • T.apply_async(expires=now + timedelta(days=2))

Postponed Task

Task Celery dapat ditunda eksekusinya jika menggunakan apply_async(). Ada dua parameter yang dapat digunakan, yaitu :

  • eta : menjalankan task pada waktu tertentu, misalknya jam 07:15 pagi, atau tiap hari minggu jam 24:00.
  • countdown : menjalankan task N detik setelah apply_async() dipanggil.

contoh penerapanya :

  • T.apply_async(countdown=10) jalankan task 10 detik dari sekarang.
  • T.apply_async(eta=now + timedelta(seconds=10)) jalankan task 10 detik dari sekarang menggunakan eta.
  • T.apply_async(eta=datetime(2020, 5, 23, 15, 0)) jalankan task pada tanggal 23/05/2020 jam 15:00.

Multiple Queue

Celery secara default memiliki single queue yand dinamanakan celery. Ketika kita menambahkan task tanpa memberitau queue mana yang akan digunakan, maka Celery akan memilih default queue untuk digunakan.

Pada tulisan pertama kita familiar dengan mendefinisikan task sebagai berikut,

Bayangkan jika kita memiliki lebih dari satu task berada pada 1 queue. katakanlah task A dan task B, task A misalnya memiliki prioritas lebih tinggi dibandingkan task B. Disuatu titik katakanlah task B memiliki kuantitas yang sangat banyak dibandingkan task A. Sehingga task A akan semakin riskan untuk dijalankan karena nantiya tidak dapat kita atur prioritasnya dan dijalankan dalam satu server yang sama. Sehingga sangat logis jika kita pisahkan dua task ini kedalam queue yang berbeda.

Celery dapat membuat queue secara otomatis pada broker dengan mengatur,

parameter ini secara default sudah bernilai True . Sehingga kita bisa atur misalnya task import_feed untuk di-routing ke queue feeds dengan cara berikut,

atau pada saat menjalankan worker untuk task tersebut dapat ditambahkan parameter -Q , penggunaan parameter ini akan menganulir konfigurasi yang kita buat pada .conf.task_route() .

Data Chunk

Ini berguna saat task memproses iterable data, seperti object record dari table database yang sangat banyak. Misalnya kita memiliki 100000 object record untuk dikerjakan, jika kita chunk menjadi 1000 record / task, maka kita akan memiliki 100 task di queue, yang dapat dijalankan baik secara sequencial maupun parallel.

Berikut iplementasinya dengan menggunakan .chunk() ,

Implementasi dengan manual chunk,

Task Group

grouping task memungkinkan kita dapat menjalankan task lebih cepat untuk kasus iterable data. contohnya untuk data chunk diatas, kita berhasil memecah 100.000 record menjadi 1000 record /task yang akan menghasilkan 100 task pada queue. Namun proses ini berjalan squencial. misalnya 1 task rata-rata dikerjakan dalam 1 menit, maka keseluruhan record bisa diselesaikan dalam waktu 100 menit. Untuk mempercepat proses tersebut kita bisa gunakan .goup() sehingga prosesnya menjadi parallel.

implementasi lainya bisa dengan memanfaatkan signature .s() , sehingga result dari task ke-1 akan dilewatkan ketask selanjutnya, sehingga dengan .goup() dapat dijadikan satu result untuk semua task yang djalankan secara parallel.

Contoh diatas menggunakan dua buah task. Task pertama digunakan untuk menampung result dari task untuk mencari provider terbaik bagi user x.

Task Priority

Seperti yang kita bahas sebelumnya, kita berhasil merouting task kedalam queue berbeda untuk memisahkan task dari level prioritasnya. Namun kita belum mendefinisikan bagaimana queue tersebut akan dihandle, seperti misalnya berapa banyak worker yang akan digunakan.

Saat menjalankan worker untuk suatu task kita dapat atur max count current thread dengan cara berikut,

Bahkan pada satu queue pun untuk task yang sama atau bebeda bisa saja memiliki prioritas berbeda,

Nilai priority berkisar antara nilai bulat 0–9.

Nah sekian best practice penggunaan Celery yang bisa kita coba. Selain list diatas, masih banyak yang lainya dan tentu saja akan bermanfaat untuk use-case kita kedepanya.

Semoga bermanfaat, sekian dan terima kasih.

Sumber :

--

--

Muhammad Yunus
Muhammad Yunus

Written by Muhammad Yunus

IoT Engineer, Software Developer & Machine Learning Enthusiast

No responses yet