Add SQS integration and report forwarder for filtered transactions#4620
Add SQS integration and report forwarder for filtered transactions#4620diegoximenes wants to merge 56 commits intoadd-report-filtered-txn-api-2from
Conversation
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Serialize FilteredTxReport as JSON and enqueue to SQS. The SQS client is defined as an interface in util/sqsclient for easy mocking in tests. Sensitive SQS credentials are masked in config dumps. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…point Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Tests exercise the full flow: API enqueues to mock SQS, forwarder consumes and forwards to httptest server. Covers success, endpoint failure, and empty queue scenarios. Also removes DLQ support for now. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ing-report Separates report concerns from the transaction filterer service into the dedicated filtering-report service. Adds FilteringReportRPCClient using the filteringreport namespace for future execution engine integration. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace the plain ExternalEndpoint string with a reusable HTTPClientConfig (URL + Timeout) in cmd/genericconf, making the HTTP client timeout configurable instead of hardcoded at 30s. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Extract the forwarder into cmd/filtering-report/forwarder/ as a separate package. Bundle Workers and PollInterval into a WorkersConfig struct for cleaner configuration grouping. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove the nested WorkersConfig struct and move its fields directly into Config: Count is renamed to Workers, PollInterval is promoted. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…nt field Add WaitTimeSeconds as a config option (default 5) instead of hardcoding it, and remove the externalEndpoint field from Forwarder in favor of reading directly from config. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Real AWS SQS rejects MaxNumberOfMessages outside 1-10. The mock was silently defaulting 0 to 1, which could hide caller bugs. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Replace per-report Info log with single Debug log with count - Drain response body before close to enable HTTP connection reuse - Include response body in non-2xx error messages for debugging - Derive DefaultFilteringReportRPCClientConfig from DefaultClientConfig - Fix Forwarder struct field alignment Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Introduce sqsclient.QueueClient that wraps Client with a QueueURL field. NewClient now returns *QueueClient directly. All consumers (API, forwarder) take *QueueClient instead of separate (Client, queueURL) parameters. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Fail early if sqs.queue-url or external-endpoint.url are empty - Truncate error response body to 512 bytes to avoid huge error strings - Use strings.NewReader instead of bytes.NewBufferString for request body Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
FilteredTxReport is tagged with lint:require-exhaustive-initialization but the test was only setting TxHash. Initialize all fields to satisfy the structinit linter. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Align Workers koanf tag and flag name with field name, and add Enable guard in Start() so the field is referenced within its own package. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Tests were passing nil as the SQS client, causing ReportFilteredTransactions to fail with "SQS client not configured". Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| } | ||
| if err := r.forwardToEndpoint(ctx, *msg.Body); err != nil { | ||
| log.Warn("Failed to forward report to external endpoint", "err", err, "messageId", msgID) | ||
| return 0 |
There was a problem hiding this comment.
In a future PR there will be an exponential back off retry mechanism.
We are also going to have a DLQ setup.
Rename the Queue interface to QueueClient and unexport the concrete struct (QueueClient → queueClient). NewQueueClient now returns the interface so consumers depend on the abstraction, not the implementation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
MishkaRogachev
left a comment
There was a problem hiding this comment.
A few minor questions
| sqstypes "github.com/aws/aws-sdk-go-v2/service/sqs/types" | ||
| ) | ||
|
|
||
| type MockQueueClient struct { |
There was a problem hiding this comment.
suggestion: add error injection to test error paths in forwarder's pollAndForward.
There was a problem hiding this comment.
Can you describe an example on how this error injection can be used here?
| } | ||
| if msg.Body == nil { | ||
| log.Warn("Received SQS message with nil body, deleting", "messageId", msgID) | ||
| if err = r.queueClient.Delete(ctx, *msg.ReceiptHandle); err != nil { |
There was a problem hiding this comment.
Same as msg.Body, msg.ReceiptHandle could be nil
There was a problem hiding this comment.
AWS SDK guarantees that Body, RequestId and MessageId are not nil.
So I removed the over protective nil checks here.
| } | ||
|
|
||
| func (c *Config) Validate() error { | ||
| return c.ExternalEndpoint.Validate() |
There was a problem hiding this comment.
nit: add Workers > 0 check
There was a problem hiding this comment.
I updated some types in this config here, using uint instead of int for Workers for example.
Zero is valid though, which can be used to disable the forwarding feature.
|
|
||
| type QueueClient interface { | ||
| Send(ctx context.Context, body string) error | ||
| Receive(ctx context.Context, waitTimeSecs, maxMessages int32) ([]sqstypes.Message, error) |
There was a problem hiding this comment.
Can you add package-local QueueMessage type to keep sqstypes.Message only in implementation?
There was a problem hiding this comment.
The goal of QueueClient is not to abstract SQS away, e.g., enable another message queue other than SQS to be used under the hood.
We could definitely do that, but the ROI of doing that right now is pretty low, it is an early optimization.
sqs.Client is not attached to a single QueueURL, and our current use case is to use it with a single queue url.
The current goal of QueueClient is just to encapsulate the QueueURL and the sqs.Client in the same object.
Seems OK for exposing the SQS types here right now.
Makes sense?
|
LGTM overall. very minor comments. I also think we need to validate workers == 1 for now till we support mutliple workers in future. |
Body, MessageId, and ReceiptHandle are always populated by SQS ReceiveMessage, so the defensive nil checks in pollAndForward were over-protective and inconsistent (ReceiptHandle was already dereferenced unguarded). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Guarantees nonnegativity at the type level instead of requiring runtime validation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
A negative PollInterval would make the retry backoff meaningless, and a negative WaitTimeSeconds is rejected by SQS on every ReceiveMessage call. Catch both at config load. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Clarifies that this config controls the SQS ReceiveMessage long-polling wait time, distinct from any HTTP or scheduling timeout in the forwarder. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Log response-body drain and read errors at Warn instead of silently dropping them, and stop reassigning the outer err from inside the defer so the control flow is obvious. resp.Body.Close() is left as-is since net/http effectively never returns a non-nil error there. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Multiple workers are already supported |
| return err | ||
| } | ||
| bodyStr := string(body) | ||
| err = a.queueClient.Send(ctx, bodyStr) |
There was a problem hiding this comment.
I'm wondering if it is possible to use sqs's SendMessageBatch here instead of loop + Send
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Resolves NIT-4672
Attention to the base branch
Summary
ReportFilteredTransactionsRPC endpoint that publishes filtered transaction reports to SQSutil/sqsclient) with interface, config, and mock client for testingHTTPClientConfigtogenericconffor configurable HTTP client settingsFuture work
🤖 Generated with Claude Code