Go Concurrency Patterns: WaitGroups and Goroutines for Production Systems

Master the essential patterns for building scalable, reliable AI and automation systems with proper synchronization, error handling, and resource management.

Understanding Go's Concurrency Foundation

Go's approach to concurrency stands apart from traditional threading models. Where other languages require manual thread management and careful synchronization, Go provides lightweight goroutines that make concurrent programming accessible and efficient. As noted in Cristian Curteanu's comprehensive guide to Golang concurrency patterns, these patterns have become essential for modern production systems.

A goroutine is a function running independently in the background, launched with the simple go keyword. These aren't OS threads--they're managed by Go's runtime scheduler, which can efficiently run thousands (or even millions) of goroutines simultaneously on a modest number of actual threads.

For AI and automation workflows, this capability transforms how we process data. Consider an automation pipeline that needs to process multiple AI model requests, fetch data from various sources, and aggregate results. Go's concurrency model allows these operations to run in parallel without the complexity of thread pools or callback hell. When building production AI systems, understanding these patterns is fundamental--similar to how pragmatic AI approaches for developers emphasizes practical implementation strategies.

The Role of WaitGroups in Synchronization

While goroutines handle the "run independently" part, WaitGroups address the "wait for completion" requirement. A WaitGroup is a synchronization primitive that blocks execution until a group of goroutines finishes. Think of it as a count-down latch: you tell the WaitGroup how many tasks to expect, each goroutine signals completion, and the main thread waits until all signals arrive.

The sync.WaitGroup provides three essential methods:

  • Add(delta int): Increments the counter by delta, typically called before launching each goroutine
  • Done(): Decrements the counter by one, signaling a goroutine's completion
  • Wait(): Blocks until the counter reaches zero
wg := &sync.WaitGroup{}
wg.Add(3) // Expecting three goroutines

for i := 0; i < 3; i++ {
 go func(id int) {
 defer wg.Done() // Always deferred for safety
 // Process item with ID: id
 result := processItem(id)
 fmt.Printf("Worker %d completed: %v\n", id, result)
 }(i)
}

wg.Wait() // Blocks until all three complete
fmt.Println("All workers finished")

As detailed in the WunderGraph deep dive on WaitGroups, the defer statement with Done() is crucial--it ensures the counter is decremented even if the goroutine panics or returns early due to an error. Without this pattern, you risk waiting indefinitely for a goroutine that never signals completion. This is especially important in AI systems where external API calls may fail unexpectedly.

WaitGroup vs errgroup: Choosing the Right Tool

The choice between WaitGroup and errgroup isn't arbitrary--it reflects fundamentally different synchronization requirements. Understanding this distinction prevents bugs that only manifest under load or failure conditions. According to the technical comparison on Stack Overflow, this choice has significant implications for production systems.

WaitGroup provides pure synchronization: it tells you when goroutines finish, but nothing about their success or failure. If a goroutine encounters an error and returns early, WaitGroup has no mechanism to propagate that information or cancel other pending goroutines.

errgroup extends WaitGroup with critical capabilities:

  1. Error propagation: Returns the first error encountered across all goroutines
  2. Context cancellation: When an error occurs, automatically cancels the context, signaling other goroutines to stop
  3. Simplified API: Combines WaitGroup counting with error handling in one primitive
// WaitGroup pattern - no error handling
var wg sync.WaitGroup
for _, item := range items {
 wg.Add(1)
 go func(item Item) {
 defer wg.Done()
 callAIModel(item) // Errors silently ignored
 }(item)
}
wg.Wait()

// errgroup pattern - proper error handling
g, ctx := errgroup.WithContext(context.Background())
for _, item := range items {
 g.Go(func() error {
 return callAIModel(item) // Errors propagated
 })
}
if err := g.Wait(); err != nil {
 log.Printf("Processing failed: %v", err)
}

For AI and automation systems, errgroup is almost always the better choice. Consider an automation workflow that calls multiple AI APIs: if one API call fails, continuing to call others wastes resources and delays error reporting. errgroup's automatic cancellation stops the remaining work immediately, saving both time and money. This pattern complements our approach to OpenAI vs open-source LLMs where multiple model integrations require robust error handling.

Practical Concurrency Patterns for AI Systems

Worker Pools for Controlled Concurrency

Worker pools represent one of the most versatile concurrency patterns in Go. At its core, a worker pool is a collection of goroutines (workers) that process tasks from a shared queue. This pattern elegantly solves the resource management problem: too many concurrent tasks can overwhelm system resources, while too few leave capacity unused. As covered in Cristian Curteanu's concurrency patterns guide, worker pools are essential for production AI systems.

type AIWorkerPool struct {
 workers int
 tasks chan AIPrompt
 results chan AIResult
 wg sync.WaitGroup
}

func NewAIWorkerPool(workers int, rateLimit int) *AIWorkerPool {
 pool := &AIWorkerPool{
 workers: workers,
 tasks: make(chan AIPrompt, workers*2),
 results: make(chan AIResult, workers*2),
 }

 for i := 0; i < workers; i++ {
 pool.wg.Add(1)
 go pool.worker(i, rateLimit)
 }

 return pool
}

func (p *AIWorkerPool) worker(id int, rateLimit int) {
 defer p.wg.Done()
 
 for prompt := range p.tasks {
 // Respect context cancellation
 select {
 case <-time.After(time.Second / time.Duration(rateLimit)):
 case <-time.After(30 * time.Second): // Timeout fallback
 continue
 }
 
 result, err := callAIModel(prompt)
 if err != nil {
 log.Printf("Worker %d error: %v", id, err)
 continue
 }
 p.results <- result
 }
}

func (p *AIWorkerPool) Submit(prompts []AIPrompt) {
 for _, prompt := range prompts {
 p.tasks <- prompt
 }
 close(p.tasks)
 p.wg.Wait()
 close(p.results)
}

For AI automation systems, worker pools are invaluable. When calling expensive AI APIs, you want concurrency for throughput but limits to prevent rate limiting or resource exhaustion. A worker pool provides exactly this control.

Fan-Out, Fan-In for Parallel Processing

The fan-out, fan-in pattern distributes work across multiple goroutines, then collects and consolidates results. This pattern excels when you have many independent tasks that can be processed in parallel. For developers working with LangChain.js, this pattern provides a robust foundation for building concurrent AI pipelines.

func FanOutBatchProcessing(ctx context.Context, items []Item) ([]Result, error) {
 numWorkers := runtime.NumCPU()
 workCh := make(chan Item, len(items))
 resultsCh := make(chan Result, len(items))
 
 var wg sync.WaitGroup
 
 // Fan-out: Start workers
 for i := 0; i < numWorkers; i++ {
 wg.Add(1)
 go func(workerID int) {
 defer wg.Done()
 
 for item := range workCh {
 select {
 case <-ctx.Done():
 return
 default:
 result, err := processAIItem(ctx, item)
 if err != nil {
 log.Printf("Worker %d failed item %s: %v", workerID, item.ID, err)
 continue
 }
 select {
 case resultsCh <- result:
 case <-ctx.Done():
 return
 }
 }
 }
 }(i)
 }
 
 // Feed all work
 go func() {
 for _, item := range items {
 workCh <- item
 }
 close(workCh)
 }()
 
 // Fan-in: Collect results
 go func() {
 wg.Wait()
 close(resultsCh)
 }()
 
 var results []Result
 for result := range resultsCh {
 results = append(results, result)
 }
 
 if err := ctx.Err(); err != nil {
 return results, err
 }
 return results, nil
}

For AI systems, this pattern transforms batch processing. Instead of sequential model calls, you can process multiple inputs simultaneously, dramatically reducing overall latency while maintaining manageable resource usage.

Context Cancellation for Graceful Shutdown

Go's Context package provides a standardized way to propagate cancellation signals across goroutines and API boundaries. This pattern is essential for building systems that can terminate gracefully when they're no longer needed. As explained in the Go Blog on pipelines, context flows through your program like a river, carrying cancellation signals, deadlines, and request-scoped values.

func ProcessAIWorkflowWithTimeout(ctx context.Context, items []Item) (AIResult, error) {
 // Create context with 30-second timeout for the entire workflow
 ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
 defer cancel()
 
 resultCh := make(chan AIResult, 1)
 errCh := make(chan error, 1)
 
 go func() {
 defer cancel() // Ensure cleanup on exit
 
 // Step 1: Preprocess data
 preprocessed, err := preprocessData(items)
 if err != nil {
 errCh <- fmt.Errorf("preprocessing failed: %w", err)
 return
 }
 
 // Step 2: Call AI model (with context support)
 result, err := callAIModelWithContext(ctx, preprocessed)
 if err != nil {
 errCh <- fmt.Errorf("AI model failed: %w", err)
 return
 }
 
 // Step 3: Post-process results
 finalResult, err := postprocessResults(ctx, result)
 if err != nil {
 errCh <- fmt.Errorf("postprocessing failed: %w", err)
 return
 }
 
 resultCh <- finalResult
 }()
 
 select {
 case result := <-resultCh:
 return result, nil
 case err := <-errCh:
 return AIResult{}, err
 case <-ctx.Done():
 return AIResult{}, ctx.Err() // Timeout or parent cancellation
 }
}

For AI automation, context cancellation prevents wasted computation. If a user cancels a request or a timeout expires, you don't want expensive AI model calls continuing in the background. Context cancellation stops new work and allows goroutines to exit cleanly, preventing unnecessary costs.

Error Handling in Concurrent Operations

Structured Error Propagation

Error handling in concurrent Go code requires special attention. Traditional patterns break down when errors occur across multiple goroutines, leading to incomplete error reporting or unhandled failures. According to Stack Overflow's analysis of context cancellation patterns, the choice of synchronization primitive directly impacts error handling capabilities.

The errgroup package provides elegant error aggregation. When any goroutine returns an error, errgroup cancels the context and returns that error from Wait(). This "fail fast" behavior prevents wasting resources on doomed operations.

func BatchAIModelProcessing(ctx context.Context, prompts []AIPrompt) ([]AIResult, error) {
 g, ctx := errgroup.WithContext(ctx)
 
 results := make([]AIResult, len(prompts))
 var mu sync.Mutex
 
 for i, prompt := range prompts {
 i, prompt := i, prompt // Create new variables for closure
 g.Go(func() error {
 // Check context before expensive work
 if ctx.Err() != nil {
 return ctx.Err()
 }
 
 result, err := callAIModel(ctx, prompt)
 if err != nil {
 return fmt.Errorf("prompt %d failed: %w", i, err)
 }
 
 mu.Lock()
 results[i] = result
 mu.Unlock()
 
 return nil
 })
 }
 
 // Returns first error encountered, automatically cancels remaining goroutines
 if err := g.Wait(); err != nil {
 log.Printf("Batch processing failed: %v", err)
 return results, err
 }
 
 return results, nil
}

Aggregating Multiple Errors

Sometimes you want to continue processing all items even when some fail, then report all errors at once. Go 1.20+ provides errors.Join for this purpose:

func ProcessAllWithErrorAggregation(ctx context.Context, items []Item) (ProcessedBatch, error) {
 g, ctx := errgroup.WithContext(ctx)
 var mu sync.Mutex
 var errors []error
 var successfulResults []AIResult
 
 for _, item := range items {
 g.Go(func() error {
 result, err := processItem(ctx, item)
 if err != nil {
 mu.Lock()
 errors = append(errors, fmt.Errorf("item %s: %w", item.ID, err))
 mu.Unlock()
 return nil // Don't propagate - collect all errors
 }
 
 mu.Lock()
 successfulResults = append(successfulResults, result)
 mu.Unlock()
 
 return nil
 })
 }
 
 g.Wait()
 
 if len(errors) > 0 {
 return ProcessedBatch{
 Results: successfulResults,
 Partial: true,
 }, fmt.Errorf("encountered %d errors: %v", len(errors), errors.Join(errors...))
 }
 
 return ProcessedBatch{
 Results: successfulResults,
 Partial: false,
 }, nil
}

For AI systems, this pattern balances fault tolerance with efficiency. You can process all items, capturing which succeeded and which failed, then decide whether to retry or report partial results.

Cost Optimization Through Concurrency

Resource-Bounded Processing

AI systems often involve expensive operations--GPU inference, API calls with per-request costs, or computationally intensive processing. Concurrency patterns help you control costs by bounding resource usage. As highlighted in the WunderGraph analysis of WaitGroup performance, proper concurrency management prevents runaway resource consumption.

A worker pool with a fixed number of workers ensures you never exceed your concurrency limits:

type CostOptimizedAIPool struct {
 poolSize int
 rateLimit <-chan time.Time
 semaphore chan struct{}
}

func NewCostOptimizedAIPool(maxConcurrent int, requestsPerSecond int) *CostOptimizedAIPool {
 pool := &CostOptimizedAIPool{
 poolSize: maxConcurrent,
 semaphore: make(chan struct{}, maxConcurrent),
 }
 
 if requestsPerSecond > 0 {
 interval := time.Second / time.Duration(requestsPerSecond)
 ticker := time.NewTicker(interval)
 pool.rateLimit = ticker.C
 }
 
 return pool
}

func (p *CostOptimizedAIPool) Process(ctx context.Context, prompt AIPrompt) (AIResult, error) {
 // Acquire semaphore (blocks if at capacity)
 select {
 case p.semaphore <- struct{}{}:
 defer func() { <-p.semaphore }()
 case <-ctx.Done():
 return AIResult{}, ctx.Err()
 }
 
 // Apply rate limiting if configured
 if p.rateLimit != nil {
 select {
 case <-p.rateLimit:
 case <-ctx.Done():
 return AIResult{}, ctx.Err()
 }
 }
 
 return callExpensiveAIModel(ctx, prompt)
}

Avoiding Goroutine Leaks

Memory leaks in concurrent Go programs often stem from goroutines that never exit. Each leaked goroutine consumes memory and potentially holds references to other objects, preventing garbage collection. Over time, this leads to resource exhaustion and increased operational costs.

Prevention requires disciplined use of context cancellation:

func SafeAIBatchProcessing(ctx context.Context, prompts []AIPrompt) ([]AIResult, error) {
 g, ctx := errgroup.WithContext(ctx)
 
 for _, prompt := range prompts {
 g.Go(func() error {
 // CRITICAL: Check context before starting expensive work
 if ctx.Err() != nil {
 return ctx.Err()
 }
 
 result, err := callAIWithRetry(ctx, prompt)
 if err != nil {
 return fmt.Errorf("AI call failed: %w", err)
 }
 
 return saveResult(ctx, result)
 })
 }
 
 return g.Wait()
}

The key principle: every goroutine should respect context cancellation. This ensures that when the caller no longer needs results--all pending work stops promptly, preventing wasted compute costs.

Integration Patterns for AI Workflows

Chained Processing with Context

AI workflows often involve multiple stages: data preprocessing, model inference, post-processing, and storage. Chaining these stages with proper cancellation support creates robust pipelines:

type PipelineStage func(context.Context, PipelineInput) (PipelineOutput, error)

func ChainAIStages(ctx context.Context, input PipelineInput, stages ...PipelineStage) (FinalOutput, error) {
 current := input
 
 for i, stage := range stages {
 select {
 case <-ctx.Done():
 return FinalOutput{}, ctx.Err()
 default:
 output, err := stage(ctx, current)
 if err != nil {
 return FinalOutput{}, fmt.Errorf("stage %d (%s) failed: %w", i+1, stageName(stage), err)
 }
 current = output
 }
 }

 return current.(FinalOutput), nil
}

// Example: Complete AI processing pipeline
func RunAIWorkflow(ctx context.Context, rawData []byte) (Prediction, error) {
 pipeline := ChainAIStages{
 SanitizeInput, // Clean and validate input data
 EnrichWithContext, // Add relevant context
 CallPrimaryModel, // Main AI model inference
 ValidateOutput, // Ensure quality standards
 FormatForStorage, // Prepare for database storage
 }

 return pipeline.Execute(ctx, PipelineInput{RawData: rawData})
}

Parallel Model Inference

When you need results from multiple AI models, parallel execution reduces latency dramatically:

func MultiModelInference(ctx context.Context, input Input) (MultiModelResult, error) {
 g, ctx := errgroup.WithContext(ctx)
 
 var modelAResult, modelBResult, modelCResult Result
 var mu sync.Mutex
 
 // Model A - Text generation
 g.Go(func() error {
 result, err := callTextModel(ctx, input)
 if err != nil {
 return fmt.Errorf("text model failed: %w", err)
 }
 mu.Lock()
 modelAResult = result
 mu.Unlock()
 return nil
 })

 // Model B - Image analysis
 g.Go(func() error {
 result, err := callImageModel(ctx, input)
 if err != nil {
 return fmt.Errorf("image model failed: %w", err)
 }
 mu.Lock()
 modelBResult = result
 mu.Unlock()
 return nil
 })

 // Model C - Sentiment analysis
 g.Go(func() error {
 result, err := callSentimentModel(ctx, input)
 if err != nil {
 return fmt.Errorf("sentiment model failed: %w", err)
 }
 mu.Lock()
 modelCResult = result
 mu.Unlock()
 return nil
 })

 if err := g.Wait(); err != nil {
 return MultiModelResult{}, err
 }

 return CombineModelResults(modelAResult, modelBResult, modelCResult), nil
}

This parallel approach can reduce total processing time from the sum of all model latencies to approximately the latency of the slowest model--significant savings when working with multiple AI services. These patterns form the foundation for building production-grade AI automation systems that scale reliably.

Best Practices Summary

Key principles for production concurrency

Default to errgroup

For most concurrent operations, errgroup provides better error handling and cancellation support than WaitGroup alone.

Always defer Done()

Inside every goroutine, defer wg.Done() immediately to ensure proper cleanup even on panic.

Respect context cancellation

Check ctx.Done() in long-running operations and exit promptly when cancelled.

Bound your concurrency

Use worker pools or semaphores to prevent overwhelming system resources or external services.

Handle errors explicitly

Don't ignore errors in goroutines--propagate them or log them appropriately.

Test under failure conditions

Concurrency bugs often manifest only when things go wrong. Test with timeouts and simulated failures.

Frequently Asked Questions

Ready to Build Scalable AI Systems?

Our team specializes in building production-grade AI automation systems using Go's powerful concurrency patterns. Let's discuss how we can help you achieve reliable, performant AI deployments.