diff --git a/client.go b/client.go index cee9ff3..7f1c29c 100644 --- a/client.go +++ b/client.go @@ -22,20 +22,32 @@ const ( type sqsOp int const ( - opSend sqsOp = iota - opDelete sqsOp = iota + opSend sqsOp = iota + opDelete sqsOp = iota + opChangeVisibility sqsOp = iota ) // SQSClient wraps *sqs.Client from aws-sdk-go-v2 type SQSClient interface { - SendMessageBatch(context.Context, *sqs.SendMessageBatchInput, ...func(*sqs.Options)) (*sqs.SendMessageBatchOutput, error) - DeleteMessageBatch(context.Context, *sqs.DeleteMessageBatchInput, ...func(*sqs.Options)) (*sqs.DeleteMessageBatchOutput, error) + SendMessageBatch(context.Context, + *sqs.SendMessageBatchInput, + ...func(*sqs.Options)) (*sqs.SendMessageBatchOutput, error) + DeleteMessageBatch(context.Context, + *sqs.DeleteMessageBatchInput, + ...func(*sqs.Options)) (*sqs.DeleteMessageBatchOutput, error) + ReceiveMessage(context.Context, + *sqs.ReceiveMessageInput, + ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) + ChangeMessageVisibilityBatch(context.Context, + *sqs.ChangeMessageVisibilityBatchInput, + ...func(*sqs.Options)) (*sqs.ChangeMessageVisibilityBatchOutput, error) } // genericEntry for the lack of generics in Go. type genericEntry struct { - sendReq types.SendMessageBatchRequestEntry - delReq types.DeleteMessageBatchRequestEntry + sendReq types.SendMessageBatchRequestEntry + delReq types.DeleteMessageBatchRequestEntry + changeVisibilityReq types.ChangeMessageVisibilityBatchRequestEntry } // Config is used to configure BufferedClient. @@ -51,6 +63,10 @@ type Config struct { // Following fields are optional. + // SendBatchEnabled specifies that send message dispatcher will + // be enabled or not. If not specified, defaults to false. + SendBatchEnabled bool + // SendWaitTime specifies a time limit for how long the client will // wait before it will dispatch accumulated send message requests // even if the batch isn't full. If not specified, send message @@ -71,6 +87,10 @@ type Config struct { // needs to be goroutine safe. OnSendMessageBatch func(*sqs.SendMessageBatchOutput, error) + // DeleteBatchEnabled specifies that delete message dispatcher will + // be enabled or not. If not specified, defaults to false. + DeleteBatchEnabled bool + // DeleteWaitTime specifies a time limit for how long the client will // wait before it will dispatch accumulated delete message requests // even if the batch isn't full. If not specified, delete message @@ -90,24 +110,83 @@ type Config struct { // for a delete message batch operation. If set, this callback function // needs to be goroutine safe. OnDeleteMessageBatch func(*sqs.DeleteMessageBatchOutput, error) + + // ReceiveBatchEnabled specifies that receive message dispatcher will + // be enabled or not. If not specified, defaults to false. + ReceiveBatchEnabled bool + + // ReceiveWaitTime specifies a time limit for how long the client will + // wait before it will get response from receive message(s) requests + // event if the batch isn't full. If not specified, receive message + // request will be wait till the batch is full. + ReceiveWaitTime int32 + + // ReceiveVisibilityTimeout specifies a time limit for how long the message + // will be invisible for other consumers. If not specified, defaults to + // 0. + ReceiveVisibilityTimeout int32 + + // ReceiveBufferSize specifies a limit on the number of receive message + // request that can be held in memory. If not specified, defaults to + // 1000. + ReceiveBufferSize int + + // ReceiveConcurrency limits the number of concurrent receive message SQS + // requests in progress. If not specified, defaults to ReceiveBufferSize/10. + ReceiveConcurrency int + + // OnReceiveMessage will be called with results returned by SQSClient + // for receive message operation. If set, this callback function + // needs to be goroutine safe. + OnReceiveMessage func(*sqs.ReceiveMessageOutput, error) + + // ChangeVisibilityBatchEnabled specifies that change message visibility + // dispatcher will be enabled or not. If not specified, defaults to false. + ChangeVisibilityBatchEnabled bool + + // ChangeVisibilityWaitTime specifies a time limit for how long the + // client will wait before it will dispatch accumulated change message visibility + // requests even if the batch isn't full. If not specified, change message + // visibility requests will be dispatched only when a batch is full. + ChangeVisibilityWaitTime time.Duration + + // ChangeVisibilityBufferSize specifies a limit on the number of change + // message visibility requests that can be held in memory. If not specified, + // defaults to 1000. + ChangeVisibilityBufferSize int + + // ChangeVisibilityConcurrency limits the number of concurrent change + // message visibility SQS requests in progress. If not specified, defaults to + // ChangeVisibilityBufferSize/10. + ChangeVisibilityConcurrency int + + // OnChangeMessageVisibilityBatch will be called with results returned by + // SQSClient for a change message visibility batch operation. If set, this + // callback function needs to be goroutine safe. + OnChangeMessageVisibilityBatch func(*sqs.ChangeMessageVisibilityBatchOutput, error) } // Stats contains client statistics. type Stats struct { - MessagesSent uint64 - MessagesDeleted uint64 - SendMessageBatchCalls uint64 - DeleteMessageBatchCalls uint64 + MessagesSent uint64 + MessagesDeleted uint64 + MessagesReceived uint64 + MessagesVisibilityChanged uint64 + SendMessageBatchCalls uint64 + DeleteMessageBatchCalls uint64 + ReceiveMessageCalls uint64 + ChangeMessageVisibilityBatchCalls uint64 } // BufferedClient wraps aws-sdk-go-v2's sqs.Client to provide a async buffered client. type BufferedClient struct { Config - sendQueue chan genericEntry - deleteQueue chan genericEntry - batchers sync.WaitGroup - stopped bool - stats Stats + sendQueue chan genericEntry + deleteQueue chan genericEntry + changeVisibilityQueue chan genericEntry + batchers sync.WaitGroup + stopped bool + stats Stats } // NewBufferedClient creates and returns a new instance of BufferedClient. You @@ -136,17 +215,46 @@ func NewBufferedClient(config Config) (*BufferedClient, error) { } c.deleteQueue = make(chan genericEntry, c.DeleteBufferSize) + if c.ChangeVisibilityBufferSize <= 0 { + c.ChangeVisibilityBufferSize = defaultBufferSize + } + c.changeVisibilityQueue = make(chan genericEntry, c.ChangeVisibilityBufferSize) + + if c.ReceiveWaitTime < 0 { + c.ReceiveWaitTime = 0 + } + + if c.ReceiveWaitTime > 12*60*60 { + c.ReceiveWaitTime = 12 * 60 * 60 + } + if c.SendConcurrency < 1 { c.SendConcurrency = c.SendBufferSize / maxBatchSize } - c.batchers.Add(1) - go c.batcher(c.sendQueue, c.SendWaitTime, c.SendConcurrency, opSend, &c.batchers) + + if c.SendBatchEnabled { + c.batchers.Add(1) + go c.batcher(c.sendQueue, c.SendWaitTime, c.SendConcurrency, opSend, &c.batchers) + } if c.DeleteConcurrency < 1 { c.DeleteConcurrency = c.DeleteBufferSize / maxBatchSize } - c.batchers.Add(1) - go c.batcher(c.deleteQueue, c.DeleteWaitTime, c.DeleteConcurrency, opDelete, &c.batchers) + + if c.DeleteBatchEnabled { + c.batchers.Add(1) + go c.batcher(c.deleteQueue, c.DeleteWaitTime, c.DeleteConcurrency, opDelete, &c.batchers) + } + + if c.ChangeVisibilityConcurrency < 1 { + c.ChangeVisibilityConcurrency = c.ChangeVisibilityBufferSize / maxBatchSize + } + + if c.ChangeVisibilityBatchEnabled { + c.batchers.Add(1) + go c.batcher(c.changeVisibilityQueue, c.ChangeVisibilityWaitTime, c.ChangeVisibilityConcurrency, + opChangeVisibility, &c.batchers) + } return c, nil } @@ -162,6 +270,7 @@ func (c *BufferedClient) Stop() { close(c.sendQueue) close(c.deleteQueue) + close(c.changeVisibilityQueue) c.batchers.Wait() } @@ -169,10 +278,14 @@ func (c *BufferedClient) Stop() { // Stats returns client statistics. func (c *BufferedClient) Stats() Stats { s := Stats{ - MessagesSent: atomic.LoadUint64(&c.stats.MessagesSent), - MessagesDeleted: atomic.LoadUint64(&c.stats.MessagesDeleted), - SendMessageBatchCalls: atomic.LoadUint64(&c.stats.SendMessageBatchCalls), - DeleteMessageBatchCalls: atomic.LoadUint64(&c.stats.DeleteMessageBatchCalls), + MessagesSent: atomic.LoadUint64(&c.stats.MessagesSent), + MessagesReceived: atomic.LoadUint64(&c.stats.MessagesReceived), + MessagesDeleted: atomic.LoadUint64(&c.stats.MessagesDeleted), + MessagesVisibilityChanged: atomic.LoadUint64(&c.stats.MessagesVisibilityChanged), + SendMessageBatchCalls: atomic.LoadUint64(&c.stats.SendMessageBatchCalls), + ReceiveMessageCalls: atomic.LoadUint64(&c.stats.ReceiveMessageCalls), + DeleteMessageBatchCalls: atomic.LoadUint64(&c.stats.DeleteMessageBatchCalls), + ChangeMessageVisibilityBatchCalls: atomic.LoadUint64(&c.stats.ChangeMessageVisibilityBatchCalls), } return s } @@ -215,6 +328,22 @@ func (c *BufferedClient) DeleteMessageAsync(entries ...types.DeleteMessageBatchR return nil } +// ChangeMessageVisibilityAsync schedules message(s) which visibility needs to be +// change. It blocks if the change message visibility buffer is full. +func (c *BufferedClient) ChangeMessageVisibilityAsync(entries ...types.ChangeMessageVisibilityBatchRequestEntry) error { + if c.stopped { + return fmt.Errorf("client stopped") + } + + for _, entry := range entries { + c.changeVisibilityQueue <- genericEntry{ + changeVisibilityReq: entry, + } + } + + return nil +} + // batcher batches multiple send and delete requests to be dispatched in batches. func (c *BufferedClient) batcher(queue chan genericEntry, waitTime time.Duration, concurrency int, op sqsOp, wg *sync.WaitGroup) { defer wg.Done() @@ -297,6 +426,14 @@ func (c *BufferedClient) dispatchBatch(batch []genericEntry, op sqsOp) { entries = append(entries, ge.delReq) } c.deleteMessageBatch(entries) + case opChangeVisibility: + var arr [maxBatchSize]types.ChangeMessageVisibilityBatchRequestEntry + var entries = arr[:0] + + for _, ge := range batch { + entries = append(entries, ge.changeVisibilityReq) + } + c.changeMessageVisibilityBatch(entries) } } @@ -326,6 +463,42 @@ func (c *BufferedClient) deleteMessageBatch(entries []types.DeleteMessageBatchRe } } +func (c *BufferedClient) changeMessageVisibilityBatch(entries []types.ChangeMessageVisibilityBatchRequestEntry) { + resp, err := c.SQSClient.ChangeMessageVisibilityBatch(context.TODO(), &sqs.ChangeMessageVisibilityBatchInput{ + Entries: entries, + QueueUrl: aws.String(c.QueueURL), + }) + + atomic.AddUint64(&c.stats.ChangeMessageVisibilityBatchCalls, 1) + atomic.AddUint64(&c.stats.MessagesVisibilityChanged, uint64(len(entries))) + + if c.OnChangeMessageVisibilityBatch != nil { + c.OnChangeMessageVisibilityBatch(resp, err) + } +} + +func (c *BufferedClient) ReceiveMessages() { + resp, err := c.SQSClient.ReceiveMessage(context.TODO(), &sqs.ReceiveMessageInput{ + QueueUrl: aws.String(c.QueueURL), + MaxNumberOfMessages: maxBatchSize, + MessageAttributeNames: []string{ + string(types.QueueAttributeNameAll), + }, + ReceiveRequestAttemptId: nil, + VisibilityTimeout: aws.ToInt32(&c.ReceiveVisibilityTimeout), + WaitTimeSeconds: aws.ToInt32(&c.ReceiveWaitTime), + }) + + atomic.AddUint64(&c.stats.ReceiveMessageCalls, 1) + if err == nil { + atomic.AddUint64(&c.stats.MessagesReceived, uint64(len(resp.Messages))) + } + + if c.OnReceiveMessage != nil { + c.OnReceiveMessage(resp, err) + } +} + // disabledTicker returns a ticker that is stopped and shall never tick. func disabledTicker() *time.Ticker { ticker := time.NewTicker(1 * time.Hour)