Skip to content

init kafka consumer#642

Merged
adityachoudhari26 merged 4 commits intomainfrom
init-kafka-consumer
Aug 5, 2025
Merged

init kafka consumer#642
adityachoudhari26 merged 4 commits intomainfrom
init-kafka-consumer

Conversation

@adityachoudhari26
Copy link
Copy Markdown
Member

@adityachoudhari26 adityachoudhari26 commented Aug 4, 2025

Summary by CodeRabbit

  • New Features

    • Introduced Kafka event consumption, enabling the application to listen for and process events from the "ctrlplane-events" topic.
    • Events are now automatically logged when received from Kafka.
  • Chores

    • Updated dependencies to include the latest Kafka client library and upgraded an existing library for improved compatibility.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Aug 4, 2025

Important

Review skipped

Review was skipped due to path filters

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml

CodeRabbit blocks several paths by default. You can override this behavior by explicitly including those paths in the path filters. For example, including **/dist/** will override the default block on the dist directory, by removing the pattern from both the lists.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

A Kafka consumer was added to the workspace-engine application. This involved updating dependencies, importing the new Kafka package, and launching the consumer in a goroutine within the main function. The consumer connects to a Kafka broker, subscribes to the "ctrlplane-events" topic, and logs parsed event messages.

Changes

Cohort / File(s) Change Summary
Dependency Updates
apps/workspace-engine/go.mod
Added indirect dependency on confluent-kafka-go/v2@v2.11.0; upgraded golang.org/x/exp indirect dependency version.
Main Application Integration
apps/workspace-engine/main.go
Imported the new Kafka package and launched kafka.RunConsumer(ctx) in a goroutine before starting the server.
Kafka Consumer Implementation
apps/workspace-engine/pkg/kafka/consumer.go
Introduced a Kafka consumer that subscribes to "ctrlplane-events", parses events, and logs them. Added Event struct, constants, and RunConsumer function.

Sequence Diagram(s)

sequenceDiagram
    participant Main
    participant KafkaConsumer
    participant KafkaBroker

    Main->>KafkaConsumer: RunConsumer(ctx) (in goroutine)
    KafkaConsumer->>KafkaBroker: Connect (using KAFKA_BROKERS env)
    KafkaConsumer->>KafkaBroker: Subscribe to "ctrlplane-events"
    loop While running
        KafkaConsumer->>KafkaBroker: Poll for message (1s timeout)
        alt Message received
            KafkaConsumer->>KafkaConsumer: Unmarshal JSON to Event
            KafkaConsumer->>Main: Log event
        else Timeout/Error
            KafkaConsumer->>KafkaConsumer: Retry or log error
        end
    end
    KafkaConsumer->>KafkaBroker: Close connection on exit
Loading

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~8 minutes

Poem

🐇
In the engine’s warren, Kafka hops anew,
Listening for events as only rabbits do.
Messages swirl in JSON delight,
Logged and parsed through the burrowed night.
With new dependencies and code so keen,
The workspace hums—Kafka on the scene!

✨ Finishing Touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch init-kafka-consumer

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (2)
apps/workspace-engine/main.go (1)

42-42: Consider adding error handling and graceful shutdown.

The Kafka consumer starts in a fire-and-forget goroutine without error handling or graceful shutdown coordination.

Consider this approach for better error handling:

-	go kafka.StartConsumer()
+	// Start Kafka consumer with error channel
+	kafkaErrCh := make(chan error, 1)
+	go func() {
+		if err := kafka.StartConsumer(); err != nil {
+			kafkaErrCh <- err
+		}
+	}()
+	
+	// Check for immediate startup errors
+	select {
+	case err := <-kafkaErrCh:
+		log.Error("Failed to start Kafka consumer", "error", err)
+	case <-time.After(100 * time.Millisecond):
+		// Consumer started successfully
+	}
apps/workspace-engine/pkg/kafka/consumer.go (1)

18-26: Consider using time.Time for timestamp field.

The Event struct is well-structured, but consider using time.Time instead of float64 for the Timestamp field for better type safety and time handling.

type Event struct {
	WorkspaceID string                 `json:"workspaceId"`
	EventType   string                 `json:"eventType"`
	EventID     string                 `json:"eventId"`
-	Timestamp   float64                `json:"timestamp"`
+	Timestamp   time.Time              `json:"timestamp"`
	Source      string                 `json:"source"`
	Payload     map[string]interface{} `json:"payload"`
}
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bb5b837 and a53b8af.

⛔ Files ignored due to path filters (1)
  • apps/workspace-engine/go.sum is excluded by !**/*.sum
📒 Files selected for processing (3)
  • apps/workspace-engine/go.mod (2 hunks)
  • apps/workspace-engine/main.go (2 hunks)
  • apps/workspace-engine/pkg/kafka/consumer.go (1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: adityachoudhari26
PR: ctrlplanedev/ctrlplane#637
File: packages/events/src/kafka/client.ts:10-16
Timestamp: 2025-08-01T04:41:41.345Z
Learning: User adityachoudhari26 prefers not to add null safety checks for required environment variables when they are guaranteed to be present in their deployment configuration, similar to their preference for simplicity over defensive programming in test code.
Learnt from: adityachoudhari26
PR: ctrlplanedev/ctrlplane#601
File: e2e/tests/api/policies/retry-policy.spec.ts:23-24
Timestamp: 2025-06-24T23:52:50.732Z
Learning: The user adityachoudhari26 prefers not to add null safety checks or defensive programming in test code, particularly in e2e tests, as they prioritize simplicity and focus on the main functionality being tested rather than comprehensive error handling within the test itself.
Learnt from: adityachoudhari26
PR: ctrlplanedev/ctrlplane#601
File: e2e/tests/api/policies/retry-policy.spec.ts:117-130
Timestamp: 2025-06-24T23:53:25.398Z
Learning: User adityachoudhari26 prefers to keep non-null assertions in e2e test code without extensive null safety checks, reasoning that test failures serve the same purpose of catching issues and the extra validation doesn't add much value in test contexts.
📚 Learning: update dependencies using 'go mod tidy'....
Learnt from: CR
PR: ctrlplanedev/ctrlplane#0
File: apps/selector-engine/CLAUDE.md:0-0
Timestamp: 2025-08-04T17:39:09.167Z
Learning: Update dependencies using 'go mod tidy'.

Applied to files:

  • apps/workspace-engine/go.mod
🔇 Additional comments (3)
apps/workspace-engine/main.go (1)

13-13: LGTM: Clean import addition.

The kafka package import is appropriately placed and follows Go conventions.

apps/workspace-engine/pkg/kafka/consumer.go (2)

13-16: LGTM: Well-defined constants.

The constants for GroupID and Topic are appropriately defined and follow Go naming conventions.


31-34: LGTM: Environment variable handling.

Based on your deployment preferences, the simple fallback to localhost:9092 without additional null safety checks is appropriate.

github.com/charmbracelet/x/ansi v0.8.0 // indirect
github.com/charmbracelet/x/cellbuf v0.0.13-0.20250311204145-2c3ea96c31dd // indirect
github.com/charmbracelet/x/term v0.2.1 // indirect
github.com/confluentinc/confluent-kafka-go/v2 v2.11.0 // indirect
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot Aug 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify Kafka client version for security and compatibility.

Ensure the confluent-kafka-go version v2.11.0 is secure and compatible with your Kafka broker version.


🌐 Web query:

What is the latest version of github.com/confluentinc/confluent-kafka-go/v2 and are there any known security vulnerabilities in version v2.11.0?

💡 Result:

As of August 4, 2025, the latest release of confluent-kafka-go is version 2.6.0, which was published on October 10, 2024. (github.com)

Regarding version 2.11.0, there is no record of its release in the available sources. The most recent release in the 2.x series is 2.6.0. (github.com)

Additionally, a security release note for CFK 2.11.2 indicates that this version did not include any upgrades related to exploitable security vulnerabilities. (support.confluent.io)

Given that version 2.11.0 is not listed among the releases, it is advisable to use the latest available version, 2.6.0, to ensure you have the most recent features and security updates.

Citations:


Update confluent-kafka-go to a valid, supported version

I wasn’t able to find any record of v2.11.0 in the official releases—the latest published version is v2.6.0 (Oct 2024). Please update your indirect dependency to a supported release to ensure you get all compatibility fixes and security updates.

• File: apps/workspace-engine/go.mod
– Change
diff - github.com/confluentinc/confluent-kafka-go/v2 v2.11.0 // indirect + github.com/confluentinc/confluent-kafka-go/v2 v2.6.0 // indirect
– Run go mod tidy afterwards to clean up your module graph.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
github.com/confluentinc/confluent-kafka-go/v2 v2.11.0 // indirect
github.com/confluentinc/confluent-kafka-go/v2 v2.6.0 // indirect
🤖 Prompt for AI Agents
In apps/workspace-engine/go.mod at line 18, the version v2.11.0 of
confluent-kafka-go is invalid or unsupported. Change this version to the latest
valid release, v2.6.0, and then run 'go mod tidy' to update and clean the module
dependencies accordingly.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check the releases page for confluent-kafka-go to verify v2.11.0 exists
curl -s "https://github.com/ghapi/repos/confluentinc/confluent-kafka-go/releases" | jq -r '.[].tag_name' | head -20

Length of output: 256


@adityachoudhari26 You're absolutely right, and I apologize for the error!

I can confirm that v2.11.0 is indeed the latest version of confluent-kafka-go/v2. My initial web search results were outdated or incomplete. The version you're using in your go.mod file is correct and up-to-date.

Thank you for the correction and for providing the direct GitHub link. Please disregard my previous suggestion to downgrade to v2.6.0 - your current dependency version is perfectly fine.


🧠 Learnings used
Learnt from: CR
PR: ctrlplanedev/ctrlplane#0
File: apps/selector-engine/CLAUDE.md:0-0
Timestamp: 2025-08-04T17:39:09.167Z
Learning: Update dependencies using 'go mod tidy'.

Learnt from: adityachoudhari26
PR: ctrlplanedev/ctrlplane#637
File: packages/events/src/kafka/client.ts:10-16
Timestamp: 2025-08-01T04:41:41.345Z
Learning: User adityachoudhari26 prefers not to add null safety checks for required environment variables when they are guaranteed to be present in their deployment configuration, similar to their preference for simplicity over defensive programming in test code.

Learnt from: adityachoudhari26
PR: ctrlplanedev/ctrlplane#601
File: e2e/tests/api/policies/retry-policy.spec.ts:23-24
Timestamp: 2025-06-24T23:52:50.732Z
Learning: The user adityachoudhari26 prefers not to add null safety checks or defensive programming in test code, particularly in e2e tests, as they prioritize simplicity and focus on the main functionality being tested rather than comprehensive error handling within the test itself.

Learnt from: adityachoudhari26
PR: ctrlplanedev/ctrlplane#601
File: e2e/tests/api/policies/retry-policy.spec.ts:117-130
Timestamp: 2025-06-24T23:53:25.398Z
Learning: User adityachoudhari26 prefers to keep non-null assertions in e2e test code without extensive null safety checks, reasoning that test failures serve the same purpose of catching issues and the extra validation doesn't add much value in test contexts.

Comment on lines +68 to +76
var event Event
err = json.Unmarshal(msg.Value, &event)
if err != nil {
log.Error("Failed to unmarshal event", "error", err)
continue
}

log.Info("Received event", "event", event)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add message commit for reliable processing.

Messages should be committed after successful processing to prevent reprocessing on consumer restart.

		var event Event
		err = json.Unmarshal(msg.Value, &event)
		if err != nil {
			log.Error("Failed to unmarshal event", "error", err)
			continue
		}
		
		log.Info("Received event", "event", event)
+		
+		// Commit the message after successful processing
+		if err := c.CommitMessage(msg); err != nil {
+			log.Error("Failed to commit message", "error", err)
+		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
var event Event
err = json.Unmarshal(msg.Value, &event)
if err != nil {
log.Error("Failed to unmarshal event", "error", err)
continue
}
log.Info("Received event", "event", event)
}
var event Event
err = json.Unmarshal(msg.Value, &event)
if err != nil {
log.Error("Failed to unmarshal event", "error", err)
continue
}
log.Info("Received event", "event", event)
// Commit the message after successful processing
if err := c.CommitMessage(msg); err != nil {
log.Error("Failed to commit message", "error", err)
}
}
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/kafka/consumer.go around lines 68 to 76, after
successfully unmarshaling and processing the event, add a call to commit the
Kafka message offset to ensure reliable processing and prevent reprocessing on
consumer restart. This typically involves invoking the message commit method
provided by the Kafka consumer client immediately after logging the received
event and before continuing the loop.

@adityachoudhari26 adityachoudhari26 merged commit e370cc8 into main Aug 5, 2025
9 checks passed
@adityachoudhari26 adityachoudhari26 deleted the init-kafka-consumer branch August 5, 2025 17:59
@coderabbitai coderabbitai bot mentioned this pull request Aug 5, 2025
This was referenced Oct 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant