Rust: Strategi Queue, Cache, dan Locking untuk Worker Terdistribusi menjawab bagaimana merancang sistem worker yang memanfaatkan antrean job, cache lokal, dan locking untuk mencegah duplikasi sekaligus memudahkan operasi ketika terjadi timeout atau retry. Artikel ini langsung fokus pada problem operasional utama tanpa pembukaan umum: bagaimana worker memilih job, memastikan cache dan status konsisten, menahan duplikasi, serta pulih dari kegagalan.

1. Desain antrean job terdistribusi

Antrean job adalah eksternal contract antar worker dan producer. Tujuan utamanya adalah menjaga ordering yang dapat diandalkan (sesuai kebutuhan) serta memberi tooling untuk retry dan observabilitas. Dalam konteks Rust, pendekatan yang paling umum adalah menyimpan job sebagai pesan serialisasi (JSON, MessagePack) di broker seperti Redis Streams, Kafka, atau PostgreSQL LISTEN/NOTIFY. Karena fokusnya pada kerangka kerja generik, kita anggap antrean Redis LIST sebagai contoh minimal.

Producer melakukan enqueue job dengan status pending, worker menge-claim job dengan operasi atomic. Ruang lingkup harus mencakup metadata seperti job_id, timestamp, dan nomor retry agar bisa mendeteksi timeout.

Contoh enqueue job dengan Tokio + Redis:

async fn enqueue_job(client: &redis::Client, payload: &JobPayload) -> redis::RedisResult<()> {
    let mut conn = client.get_async_connection().await?;
    let serialized = serde_json::to_string(payload)?;
    redis::cmd("LPUSH")
        .arg("jobs:queue")
        .arg(serialized)
        .query_async(&mut conn)
        .await?;
    Ok(())
}

Job payload bisa memuat ID unik dan informasi retry. Observasi dan metrik (misalnya job dipush per detik) ditambahkan via middleware logging atau integrasi Prometheus.

2. Cache lokal untuk menghindari pekerjaan duplikat

Worker sering kali perlu melihat status job sebelumnya untuk menghindari operasi ganda, terutama bila job tidak idempotent. Cache lokal (per proses atau per task) mempercepat keputusan tanpa memukul database eksternal setiap kali.

Gunakan struktur thread-safe seperti DashMap atau tokio::sync::RwLock untuk menyimpan state job yang baru diproses. Cache memegang entry ber-ttl pendek (misalnya 30–60 detik) sehingga konsistensi global tidak terganggu.

Contoh cache lokal sederhana:

type JobId = String;

struct LocalCache {
    map: DashMap,
}

impl LocalCache {
    fn insert(&self, id: JobId) {
        self.map.insert(id, Instant::now());
    }

    fn is_recent(&self, id: &JobId, window: Duration) -> bool {
        self.map.get(id)
            .map(|entry| entry.value().elapsed() < window)
            .unwrap_or(false)
    }
}

Penjagaan waktu dengan Instant menghindari pengambilan job dengan ID sama dalam jangka pendek. Bersamaan dengan cache, worker sebaiknya mengecek backend shared (misalnya Redis atau DB) sebelum memutuskan job benar-benar baru.

3. Locking ringan dan konsistensi

Untuk mencegah dua worker memproses job sama secara paralel, kita perlu locking minimal. Di Rust, lock bisa diimplementasikan secara distributed via Redis SETNX, atau lokal via tokio::sync::Mutex jika job ID dipartisi per instance.

Salah satu pola: ambil kunci di Redis dengan TTL singkat sebelum memproses job. Jika gagal mendapatkan API, worker menolak job agar dapat retry.

Contoh locking ringan (lokal) dilengkapi logging/metrics:

let active_jobs = Arc::new(Mutex::new(HashSet::new()));

async fn process(job_id: String, active_jobs: Arc>>) {
    let mut guard = active_jobs.lock().await;
    if !guard.insert(job_id.clone()) {
        tracing::warn!(job_id, "Job sudah sedang diproses");
        return;
    }
    drop(guard);

    // Tambahkan metrik start
    // ... proses job ...

    let mut guard = active_jobs.lock().await;
    guard.remove(&job_id);
    tracing::info!(job_id, "Job selesai");
}

Struktur ini mencatat metrik melalui crate seperti tracing dan memberi waktu tunggu lock sesuaikan jika kerjaan rentan deadlock. Pastikan lock release terjadi di blok finally (atau drop guard) agar tidak meninggalkan lock ketika panic.

4. Recovery saat timeout dan retry

Worker harus bisa mendeteksi job yang terjebak (timeout) dan mengembalikannya ke antrean atau menandai error. Dua teknik umum:

  • Visibility timeout: ketika worker mengambil job, rediset TTL-nya. Jika worker gagal menyelesaikan sebelum timeout, job otomatis tersedia kembali.
  • Watchdog monitoring: sistem pemantau memeriksa job yang sudah diambil tapi tidak selesai dalam jangka waktu tertentu, lalu memindahkannya ke antrean retry.

Implementasi simple:

async fn handle_claimed_job(job_id: JobId, redis: &redis::Client) {
    let timer = tokio::time::sleep(Duration::from_secs(60));
    tokio::pin!(timer);

    loop {
        tokio::select! {
            _ = &mut timer => {
                retry_job(job_id.clone(), redis).await;
                break;
            }
            result = process_job(&job_id) => {
                if result.is_ok() {
                    acknowledge_job(&job_id, redis).await;
                } else {
                    retry_job(job_id.clone(), redis).await;
                }
                break;
            }
        }
    }
}

Block select! memastikan recovery dan retry secara eksplisit. Logging pada setiap cabang membantu operasi memahami apakah job timeout, error, atau berhasil.

5. Observability dan operasi

Semua mekanisme di atas harus memberi data operasional. Pastikan:

  • Setiap job log dimulai/selesai, termasuk job_id dan durasi.
  • Metrik counters untuk enqueue, dequeue, lock timeout, dan retry.
  • Alert bila antrean menumpuk atau job gagal berkali-kali.

Gunakan crate seperti tracing + tracing-subscriber untuk log struktural, dan prometheus_client atau opentelemetry untuk metrik. Dalam Redis, gunakan key seperti jobs:status:failed agar dashboard operasi bisa segera terpanggil.

Kesimpulan

Strategi queue, cache, dan locking di Rust harus saling melengkapi: antrean harus atomic, cache lokal mengurangi beban, dan locking mencegah duplikasi. Tambahan observabilitas dan recovery timeout membuat sistem worker terdistribusi tetap dapat dipantau dan dipulihkan. Dengan pendekatan ini, tim operasi memiliki data konkret untuk mendiagnosa dan menyesuaikan kerja worker dalam lingkungan produksi.