This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.
Your feedback helps us improve!
Core - Channel helpersโ
This page lists all operations on channels, available in the core package of lo.
ChannelDispatcherโ
ChannelDispatcher distributes messages from a stream to multiple channels based on a strategy.
stream := make(chan int, 100)
for i := 0; i < 100; i++ {
stream <- i
}
close(stream)
channels := lo.ChannelDispatcher(stream, 3, 10, lo.DispatchingStrategyRoundRobin[int])
// Returns 3 channels with round-robin distributionPrototype:func ChannelDispatcher[T any](stream <-chan T, count, channelBufferCap int, strategy DispatchingStrategy[T]) []<-chan T
SliceToChannelโ
SliceToChannel converts a slice to a channel with specified buffer size.
items := []int{1, 2, 3, 4, 5}
ch := lo.SliceToChannel(10, items)
for item := range ch {
fmt.Println(item)
}
// Prints 1, 2, 3, 4, 5Prototype:func SliceToChannel[T any](bufferSize int, collection []T) <-chan T
ChannelToSliceโ
ChannelToSlice converts a channel to a slice.
ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3
close(ch)
slice := lo.ChannelToSlice(ch)
// slice contains [1, 2, 3]Prototype:func ChannelToSlice[T any](ch <-chan T) []T
Generatorโ
Generator creates a channel from a generator function.
gen := lo.Generator(10, func(yield func(int)) {
for i := 0; i < 10; i++ {
yield(i * 2)
}
})
for item := range gen {
fmt.Println(item)
}
// Prints even numbers 0, 2, 4, 6, 8, 10, 12, 14, 16, 18Prototype:func Generator[T any](bufferSize int, generator func(yield func(T))) <-chan T
FanInโ
FanIn merges multiple upstream channels into a single downstream channel, reading from all input channels and forwarding their values to one output channel. The channelBufferCap parameter sets the buffer size of the output channel.
ch1 := make(chan int, 2)
ch2 := make(chan int, 2)
ch1 <- 1
ch1 <- 2
ch2 <- 3
ch2 <- 4
close(ch1)
close(ch2)
merged := lo.FanIn(10, ch1, ch2)
var result []int
for item := range merged {
result = append(result, item)
}
// result: []int{1, 2, 3, 4} (order may vary as goroutine scheduling differs)Prototype:func FanIn[T any](channelBufferCap int, upstreams ...<-chan T) <-chan T
ChannelMergeโ
ChannelMerge merges multiple channels into a single channel.
ch1 := make(chan int, 2)
ch2 := make(chan int, 2)
ch1 <- 1
ch1 <- 2
ch2 <- 3
ch2 <- 4
close(ch1)
close(ch2)
merged := lo.ChannelMerge(10, ch1, ch2)
var result []int
for item := range merged {
result = append(result, item)
}
// result contains [1, 2, 3, 4] (order may vary)Similar:Prototype:func ChannelMerge[T any](channelBufferCap int, upstreams ...<-chan T) <-chan T
FanOutโ
FanOut splits a single channel into multiple channels.
upstream := make(chan int, 6)
for i := 0; i < 6; i++ {
upstream <- i
}
close(upstream)
downstreams := lo.FanOut(3, 10, upstream)
// Returns 3 channels, each receiving 2 itemsPrototype:func FanOut[T any](count, channelsBufferCap int, upstream <-chan T) []<-chan T
Bufferโ
Buffer reads up to size items from a channel and returns them as a slice.
ch := make(chan int, 10)
for i := 1; i <= 5; i++ {
ch <- i
}
close(ch)
items, length, readTime, ok := lo.Buffer(ch, 3)
// items: []int{1, 2, 3}
// length: 3
// readTime: ~0s (immediate read from buffered channel)
// ok: true (channel was closed)Batch (Deprecated)
Batch is an alias for Buffer.
ch := make(chan string, 5)
ch <- "a"
ch <- "b"
ch <- "c"
close(ch)
batch, length, readTime, ok := lo.Batch(ch, 2)
// batch: []string{"a", "b"}
// length: 2
// readTime: ~0s (immediate read from buffered channel)
// ok: true (channel was closed)BufferWithContext
BufferWithContext reads up to size items from a channel with context cancellation.
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
ch := make(chan int)
go func() {
time.Sleep(50 * time.Millisecond)
ch <- 1
time.Sleep(100 * time.Millisecond)
ch <- 2
}()
items, length, readTime, ok := lo.BufferWithContext(ctx, ch, 5)
// items: []int{1} (only first item received before timeout)
// length: 1
// readTime: ~100ms (context timeout)
// ok: false (context cancelled)Variant:Similar:Prototypes:func Buffer[T any](ch <-chan T, size int) (collection []T, length int, readTime time.Duration, ok bool)
func BufferWithContext[T any](ctx context.Context, ch <-chan T, size int) (collection []T, length int, readTime time.Duration, ok bool)
func Batch[T any](ch <-chan T, size int) (collection []T, length int, readTime time.Duration, ok bool)BufferWithTimeoutโ
BufferWithTimeout reads up to size items from a channel with timeout.
ch := make(chan int)
go func() {
time.Sleep(200 * time.Millisecond)
ch <- 1
}()
items, length, readTime, ok := lo.BufferWithTimeout(ch, 5, 100*time.Millisecond)
// Returns empty slice due to timeoutBatchWithTimeout (Deprecated)
BatchWithTimeout is an alias for BufferWithTimeout.
ch := make(chan float64)
go func() {
time.Sleep(150 * time.Millisecond)
ch <- 3.14
}()
batch, length, readTime, ok := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond)
// Returns empty slice due to timeoutVariant:Prototypes:func BufferWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int, readTime time.Duration, ok bool)
func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int, readTime time.Duration, ok bool)DispatchingStrategyโ
DispatchingStrategyRoundRobin distributes messages to channels in round-robin order.
strategy := lo.DispatchingStrategyRoundRobin[int]
index := strategy(42, 0, []chan int{ch1, ch2, ch3})
// Returns 0, then 1, then 2, then 0, cycling through channelsDispatchingStrategyRandom distributes messages to a random channel.
strategy := lo.DispatchingStrategyRandom[int]
index := strategy(42, 0, []chan int{ch1, ch2, ch3})
// Returns a random channel index 0, 1, or 2DispatchingStrategyWeightedRandom distributes messages to channels based on weights.
weights := []int{1, 3, 6} // Channel 0: 10%, Channel 1: 30%, Channel 2: 60%
strategy := lo.DispatchingStrategyWeightedRandom[int](weights)
index := strategy(42, 0, []chan int{ch1, ch2, ch3})
// Returns 2 most often, 1 sometimes, 0 rarelyDispatchingStrategyFirst distributes messages to the first non-full channel.
strategy := lo.DispatchingStrategyFirst[int]
index := strategy(42, 0, []chan int{ch1, ch2, ch3})
// Always returns 0 if ch1 is not fullDispatchingStrategyLeast distributes messages to the channel with the fewest items.
strategy := lo.DispatchingStrategyLeast[int]
index := strategy(42, 0, []chan int{ch1, ch2, ch3})
// Returns the index of the channel with the smallest buffer sizeDispatchingStrategyMost distributes messages to the channel with the most items.
strategy := lo.DispatchingStrategyMost[int]
index := strategy(42, 0, []chan int{ch1, ch2, ch3})
// Returns the index of the channel with the largest buffer sizeSimilar:Prototypes:func DispatchingStrategyRoundRobin[T any](msg T, index uint64, channels []<-chan T) int
func DispatchingStrategyRandom[T any](msg T, index uint64, channels []<-chan T) int
func DispatchingStrategyWeightedRandom[T any](weights []int) DispatchingStrategy[T]
func DispatchingStrategyFirst[T any](msg T, index uint64, channels []<-chan T) int
func DispatchingStrategyLeast[T any](msg T, index uint64, channels []<-chan T) int
func DispatchingStrategyMost[T any](msg T, index uint64, channels []<-chan T) int