diff --git a/internal/consumer/partitionConsumer.go b/internal/consumer/partitionConsumer.go index 4a043d1..3ee2d6e 100644 --- a/internal/consumer/partitionConsumer.go +++ b/internal/consumer/partitionConsumer.go @@ -401,20 +401,18 @@ func (p *rangePartitionConsumer) offsetLoop() { // ResetOffset will reset the consumer offset for the specified partition. func (p *rangePartitionConsumer) ResetOffset(offsetRange kafka.OffsetRange) error { - if offsetRange.HighOffset == -1 { - return errors.New("rangePartitionConsumer expects a highwatermark when resetting offset") - } - // If it has been closed, return error. select { - case _, ok := <-p.stopC: - if !ok { - return errors.New("unable to ResetOffset for rangePartitionConsumer has been closed") - } - case <-time.After(time.Nanosecond): + case <-p.stopC: + return errors.New("unable to ResetOffset for rangePartitionConsumer has been closed") + default: break } + if offsetRange.HighOffset == -1 { + return errors.New("rangePartitionConsumer expects a highwatermark when resetting offset") + } + select { case p.offsetRangeC <- offsetRange: return nil