← Back to Blog

Offloading Long-Running Tasks in Daemons: Async Executor Design

When building high-performance server daemons, we often encounter a dilemma: the main request loop demands sub-millisecond response times, yet the business logic includes tasks that simply take too long to complete synchronously. Think of configuration reloads, full data validation, heavy report generation, or massive state synchronization.

Executing these operations directly in the main thread or request handler is a recipe for disaster—service stalls, heartbeat timeouts, and unresponsive APIs. The solution lies in a robust Async Task Executor.

In this post, we’ll explore designing an execution model that ensures data consistency while providing precise state tracking, demonstrated with a concurrent implementation in Go.

The Core Challenge: More Than Just "Go Routine"

A common misconception among beginners is that asynchronous tasks are just a go func() away. While acceptable for prototypes, unmanaged concurrency in production leads to chaos:

  1. Resource Exhaustion: Uncontrolled goroutine creation can leak memory or saturate the CPU.
  2. State Blindness: Once a task is fired, the caller loses track of it. Is it queued? Running? Failed?
  3. Data Races: Concurrent access to results—background writing vs. foreground reading—causes instability.
  4. Duplicate Submissions: Without safeguards, the same heavy task might be triggered multiple times effectively DoS-ing your own service.

To solve these trade-offs, we need three pillars: a Global Registry, a Lifecycle State Machine, and Guarded Result Access.

Design Patterns

1. The Global Registry

We need a central view of all in-flight and completed tasks. This is typically a thread-safe Map.

To ensure consistency, we use a "Check-then-Act" atomic operation: before submitting, lock the registry (or use atomic methods) to check if the Task ID exists. If it does, return immediately; if not, reserve the spot.

2. Lifecycle State Machine

A task is more than just "Start" and "End". To support upstream clients (like UIs or CLI tools), we need clear state transitions:

  • CREATED: The struct is initialized.
  • ENQUEUED: Waiting for an execution slot (thread pool logic).
  • STARTED: Worker has picked up the task.
  • FINISHED / FAILED: Terminal states.

This visibility allows operators to distinguish between "system is slow" (tasks stuck in STARTED) and "system is overloaded" (tasks piling up in ENQUEUED).

3. Guarded Access

This is often overlooked. A background task updates intermediate states or writes partial results, while an API handler reads them.

In C++, we might use the GuardedPtr pattern; in Go, we can encapsulate the result object with a sync.RWMutex. The key rule: Never expose the raw data structure. Always provide a method that returns a consistent snapshot.

Implementation in Go

The following code demonstrates a simplified executor. It uses sync.Map as the registry and a buffered channel to enforce concurrency limits (simulating a semaphore or thread pool).

package main

import (
	"fmt"
	"sync"
	"time"
)

// Status defines the lifecycle stages
type Status string

const (
	Created  Status = "CREATED"
	Enqueued Status = "ENQUEUED"
	Started  Status = "STARTED"
	Finished Status = "FINISHED"
	Failed   Status = "FAILED"
)

// TaskResult encapsulates status and data
// Uses sync.RWMutex to simulate the "Guarded" pattern
type TaskResult struct {
	ID      string
	Status  Status
	Message string
	mu      sync.RWMutex
}

// Update provides thread-safe writes
func (t *TaskResult) Update(s Status, msg string) {
	t.mu.Lock()
	defer t.mu.Unlock()
	t.Status = s
	t.Message = msg
}

// GetSnapshot provides thread-safe reads
func (t *TaskResult) GetSnapshot() (string, Status, string) {
	t.mu.RLock()
	defer t.mu.RUnlock()
	return t.ID, t.Status, t.Message
}

// TaskExecutor is the core engine
type TaskExecutor struct {
	registry sync.Map      // Global task registry
	limit    chan struct{} // Concurrency semaphore
}

func NewExecutor(concurrency int) *TaskExecutor {
	return &TaskExecutor{
		// Buffered channel acts as a semaphore
		limit: make(chan struct{}, concurrency),
	}
}

// Submit handles registration and scheduling
func (e *TaskExecutor) Submit(id string, work func()) {
	// 1. Registry Check: Prevent duplicates
	// LoadOrStore is atomic, perfect for this race condition
	if _, loaded := e.registry.LoadOrStore(id, &TaskResult{ID: id, Status: Created}); loaded {
		fmt.Printf("[Warn] Task %s already exists, skipping.\n", id)
		return
	}

	// Retrieve the newly created task
	val, _ := e.registry.Load(id)
	task := val.(*TaskResult)
	
	task.Update(Enqueued, "Waiting for slot")

	// 2. Async Execution
	go func() {
		// Acquire token: blocks if channel is full (queueing effect)
		e.limit <- struct{}{} 
		defer func() { <-e.limit }() // Release token

		task.Update(Started, "Processing...")
		
		// Run actual business logic
		work()
		
		task.Update(Finished, "Completed successfully")
	}()
}

// Query allows external clients to check status safely
func (e *TaskExecutor) Query(id string) {
	if val, ok := e.registry.Load(id); ok {
		t := val.(*TaskResult)
		// Get a consistent snapshot
		id, s, m := t.GetSnapshot()
		fmt.Printf(">> Query Task[%s]: Status=%-10s | Msg=%s\n", id, s, m)
	} else {
		fmt.Printf(">> Query Task[%s]: Not Found\n", id)
	}
}

func main() {
	// Limit max concurrency to 2
	exec := NewExecutor(2)
	taskID := "backup-001"

	fmt.Println("--- Submitting Task ---")
	exec.Submit(taskID, func() {
		// Simulate heavy work
		time.Sleep(200 * time.Millisecond)
	})

	// Simulate polling
	// 1. Early check
	exec.Query(taskID)
	
	// 2. Mid-flight check
	time.Sleep(100 * time.Millisecond)
	exec.Query(taskID)
	
	// 3. Post-completion check
	time.Sleep(200 * time.Millisecond)
	exec.Query(taskID)
}

Trade-offs and Reflection

Lock Granularity

In this implementation, we use two layers of locking:

  1. sync.Map internal locks: Protecting task existence (registry).
  2. TaskResult.mu: Protecting task visibility (data).

This separation is vital. If we locked the entire registry just to read a single task's status, high-frequency polling (e.g., from a frontend dashboard) would block new task submissions, degrading throughput.

Memory Management

The demo omits garbage collection. In production, the registry cannot grow forever. You need a TTL (Time To Live) mechanism or a periodic Janitor routine to remove tasks from the map some time after they reach a terminal state (FINISHED/FAILED).

Graceful Shutdown

When the service restarts, a hard kill leaves async tasks in an unknown state. A production executor implements Shutdown():

  1. Stop accepting new tasks.
  2. Wait for the limit channel to drain (using a WaitGroup).
  3. Force exit only after a timeout.

Conclusion

An Async Task Executor is foundational infrastructure for server-side development. By combining Go's Channel primitives with traditional locking mechanisms, we can build offloading solutions that are both efficient and safe. The key lies in maintaining rigorous observability of state and concurrency safety of data.