Skip to main content
Help improve this documentation

This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.

Your feedback helps us improve!

Core - Channel helpersโ€‹

This page lists all operations on channels, available in the core package of lo.

  • ChannelDispatcherโ€‹

    ChannelDispatcher distributes messages from a stream to multiple channels based on a strategy.

    stream := make(chan int, 100)
    for i := 0; i < 100; i++ {
    stream <- i
    }
    close(stream)

    channels := lo.ChannelDispatcher(stream, 3, 10, lo.DispatchingStrategyRoundRobin[int])
    // Returns 3 channels with round-robin distribution
    Prototype:
    func ChannelDispatcher[T any](stream <-chan T, count, channelBufferCap int, strategy DispatchingStrategy[T]) []<-chan T
  • SliceToChannel converts a slice to a channel with specified buffer size.

    items := []int{1, 2, 3, 4, 5}
    ch := lo.SliceToChannel(10, items)

    for item := range ch {
    fmt.Println(item)
    }
    // Prints 1, 2, 3, 4, 5
    Prototype:
    func SliceToChannel[T any](bufferSize int, collection []T) <-chan T
  • ChannelToSlice converts a channel to a slice.

    ch := make(chan int, 3)
    ch <- 1
    ch <- 2
    ch <- 3
    close(ch)

    slice := lo.ChannelToSlice(ch)
    // slice contains [1, 2, 3]
    Prototype:
    func ChannelToSlice[T any](ch <-chan T) []T
  • Generator creates a channel from a generator function.

    gen := lo.Generator(10, func(yield func(int)) {
    for i := 0; i < 10; i++ {
    yield(i * 2)
    }
    })

    for item := range gen {
    fmt.Println(item)
    }
    // Prints even numbers 0, 2, 4, 6, 8, 10, 12, 14, 16, 18
    Prototype:
    func Generator[T any](bufferSize int, generator func(yield func(T))) <-chan T
  • FanIn merges multiple upstream channels into a single downstream channel, reading from all input channels and forwarding their values to one output channel. The channelBufferCap parameter sets the buffer size of the output channel.

    ch1 := make(chan int, 2)
    ch2 := make(chan int, 2)
    ch1 <- 1
    ch1 <- 2
    ch2 <- 3
    ch2 <- 4
    close(ch1)
    close(ch2)

    merged := lo.FanIn(10, ch1, ch2)
    var result []int
    for item := range merged {
    result = append(result, item)
    }
    // result: []int{1, 2, 3, 4} (order may vary as goroutine scheduling differs)
    Prototype:
    func FanIn[T any](channelBufferCap int, upstreams ...<-chan T) <-chan T
  • ChannelMerge merges multiple channels into a single channel.

    ch1 := make(chan int, 2)
    ch2 := make(chan int, 2)
    ch1 <- 1
    ch1 <- 2
    ch2 <- 3
    ch2 <- 4
    close(ch1)
    close(ch2)

    merged := lo.ChannelMerge(10, ch1, ch2)
    var result []int
    for item := range merged {
    result = append(result, item)
    }
    // result contains [1, 2, 3, 4] (order may vary)
    Prototype:
    func ChannelMerge[T any](channelBufferCap int, upstreams ...<-chan T) <-chan T
  • FanOut splits a single channel into multiple channels.

    upstream := make(chan int, 6)
    for i := 0; i < 6; i++ {
    upstream <- i
    }
    close(upstream)

    downstreams := lo.FanOut(3, 10, upstream)
    // Returns 3 channels, each receiving 2 items
    Prototype:
    func FanOut[T any](count, channelsBufferCap int, upstream <-chan T) []<-chan T
  • Buffer reads up to size items from a channel and returns them as a slice.

    ch := make(chan int, 10)
    for i := 1; i <= 5; i++ {
    ch <- i
    }
    close(ch)

    items, length, readTime, ok := lo.Buffer(ch, 3)
    // items: []int{1, 2, 3}
    // length: 3
    // readTime: ~0s (immediate read from buffered channel)
    // ok: true (channel was closed)

    Batch (Deprecated)

    Batch is an alias for Buffer.

    ch := make(chan string, 5)
    ch <- "a"
    ch <- "b"
    ch <- "c"
    close(ch)

    batch, length, readTime, ok := lo.Batch(ch, 2)
    // batch: []string{"a", "b"}
    // length: 2
    // readTime: ~0s (immediate read from buffered channel)
    // ok: true (channel was closed)

    BufferWithContext

    BufferWithContext reads up to size items from a channel with context cancellation.

    ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
    defer cancel()

    ch := make(chan int)
    go func() {
    time.Sleep(50 * time.Millisecond)
    ch <- 1
    time.Sleep(100 * time.Millisecond)
    ch <- 2
    }()

    items, length, readTime, ok := lo.BufferWithContext(ctx, ch, 5)
    // items: []int{1} (only first item received before timeout)
    // length: 1
    // readTime: ~100ms (context timeout)
    // ok: false (context cancelled)
    Prototypes:
    func Buffer[T any](ch <-chan T, size int) (collection []T, length int, readTime time.Duration, ok bool)
    func BufferWithContext[T any](ctx context.Context, ch <-chan T, size int) (collection []T, length int, readTime time.Duration, ok bool)
    func Batch[T any](ch <-chan T, size int) (collection []T, length int, readTime time.Duration, ok bool)
  • BufferWithTimeoutโ€‹

    BufferWithTimeout reads up to size items from a channel with timeout.

    ch := make(chan int)
    go func() {
    time.Sleep(200 * time.Millisecond)
    ch <- 1
    }()

    items, length, readTime, ok := lo.BufferWithTimeout(ch, 5, 100*time.Millisecond)
    // Returns empty slice due to timeout

    BatchWithTimeout (Deprecated)

    BatchWithTimeout is an alias for BufferWithTimeout.

    ch := make(chan float64)
    go func() {
    time.Sleep(150 * time.Millisecond)
    ch <- 3.14
    }()

    batch, length, readTime, ok := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond)
    // Returns empty slice due to timeout
    Prototypes:
    func BufferWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int, readTime time.Duration, ok bool)
    func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int, readTime time.Duration, ok bool)
  • DispatchingStrategyโ€‹

    DispatchingStrategyRoundRobin distributes messages to channels in round-robin order.

    strategy := lo.DispatchingStrategyRoundRobin[int]
    index := strategy(42, 0, []chan int{ch1, ch2, ch3})
    // Returns 0, then 1, then 2, then 0, cycling through channels

    DispatchingStrategyRandom distributes messages to a random channel.

    strategy := lo.DispatchingStrategyRandom[int]
    index := strategy(42, 0, []chan int{ch1, ch2, ch3})
    // Returns a random channel index 0, 1, or 2

    DispatchingStrategyWeightedRandom distributes messages to channels based on weights.

    weights := []int{1, 3, 6} // Channel 0: 10%, Channel 1: 30%, Channel 2: 60%
    strategy := lo.DispatchingStrategyWeightedRandom[int](weights)
    index := strategy(42, 0, []chan int{ch1, ch2, ch3})
    // Returns 2 most often, 1 sometimes, 0 rarely

    DispatchingStrategyFirst distributes messages to the first non-full channel.

    strategy := lo.DispatchingStrategyFirst[int]
    index := strategy(42, 0, []chan int{ch1, ch2, ch3})
    // Always returns 0 if ch1 is not full

    DispatchingStrategyLeast distributes messages to the channel with the fewest items.

    strategy := lo.DispatchingStrategyLeast[int]
    index := strategy(42, 0, []chan int{ch1, ch2, ch3})
    // Returns the index of the channel with the smallest buffer size

    DispatchingStrategyMost distributes messages to the channel with the most items.

    strategy := lo.DispatchingStrategyMost[int]
    index := strategy(42, 0, []chan int{ch1, ch2, ch3})
    // Returns the index of the channel with the largest buffer size
    Prototypes:
    func DispatchingStrategyRoundRobin[T any](msg T, index uint64, channels []<-chan T) int
    func DispatchingStrategyRandom[T any](msg T, index uint64, channels []<-chan T) int
    func DispatchingStrategyWeightedRandom[T any](weights []int) DispatchingStrategy[T]
    func DispatchingStrategyFirst[T any](msg T, index uint64, channels []<-chan T) int
    func DispatchingStrategyLeast[T any](msg T, index uint64, channels []<-chan T) int
    func DispatchingStrategyMost[T any](msg T, index uint64, channels []<-chan T) int