Donald Le is a quality engineer in software with 9 years of experience.
Event-driven architecture (EDA) helps deliver better real-time user experiences, and decoupled microservices, which improves your ability to adapt and innovate. There are a lot of tools available that aid in the implementation of event-driven architecture, such as Apache Kafka, RabbitMQ, or Redis. Unfortunately, there’s a steep learning curve associated with developing event-driven systems using these tools, especially when it comes to setting up the system to scale and handle the increasing workload. Troubleshooting problems related to them also requires a lot of effort, since you need to dive into the ocean of logs that the application created.
Solace PubSub+ Platform aims to address these pain points. With Solace, you do not have to worry about whether your application is scalable, or spend days or weeks debugging problems related to message queues. Solace PubSub+ Event Broker comes in three different form factors depending on your needs: hardware, software, or a cloud offering that’s available as a managed solution on different cloud providers like AWS, Google Cloud, or Azure, and self-managed in your own private cloud. Another element of the platform, Solace PubSub+ Event Portal, includes tools like Discovery and Insights that can help you design, catalog, and manage all of the events moving through your system.
In this article, I will explain how to build an event-driven application in the Go programming language using Solace so you better understand how an event-driven application works, and how Solace acts as a Swiss army knife to solve all the hardest parts of the job for us.
Overview of the Application
The application will have two services:
- Payment service
- Notification service.
When a user creates a payment to the system, the payment service will check whether that payment is valid or not. If the payment is valid, the payment service will confirm its validity. Then the payment service will send a message to Solace. The notification service is set up to subscribe to a Solace topic so that if there is a message sent to that topic, the notification service will receive the message too. If the message matches the expected format, a notification will be sent to the customer service team via Slack so that the customer service team can prepare the products’ items to send them to the user.
This diagram shows how the application works.
Prerequisites
In order to follow along with the demonstration in the article, you need:
- A Solace PubSub+ Cloud trial account with a Solace event broker already created in order to store and forward the messages from payment service to notification service.
- A Slack account with an application created using that account in order to send messages to the Slack channel from notification service
- Git installed on your machine, in order to clone code from GitHub
- Go version 1.13 or above, and go modules already installed
- A Stripe account with API keys for test environment in order to set up the payment service.
- To set up GitHub with SSH conveniently work with GitHub from local machine.
- A Postman application installed to interact with the payment service API.
Build the Application
I will first explain how to build the payment service, then the notification service. Finally, I’ll explain how to set up the pub/sub mechanism for payment service and notification service in order for them to integrate with each other.
Payment Service
In this section I will go over how to set up and interact with the payment service.
Set up the Payment Service
The payment service will be powered by Stripe. Firstly, you need to clone this GitHub repository into your machine and checkout the `core` branch.
bash cd ~ mkdir Projects cd Projects git clone https://github.com/cuongld2/shop-service.git cd shop-service git checkout core
Then create a file named `config.json` at the project’s root to store the configuration and Stripe credentials.
bash nano config.json
Copy the following content with credentials of your Stripe account to the `config.json` file
json { "stripe": { "api_keys": { "EUR": { "pk_key": "${pk_key}", "sk_key": "${sk_key}" }, "CAD": { "pk_key": "${pk_key}", "sk_key": "${sk_key}" }, "default": { "pk_key": "${pk_key}", "sk_key": "${sk_key}" } } }, "server": { "protocol": "http://", "domain": "localhost", "port": "8080" } }
Below is the Stripe page where I get the test API keys, your Stripe page should look similar to it.
Alright, after having the config.json
intact, all you need to do is run the below commands to bring up the payment service:
bash # To install the needed dependencies go mod download # To clean the go.sum files for unneeded dependencies if having to go mod tidy # To bring up the payment service go run main.go -config=config.json
If you do not see any error, you’re good to go to the next part for interacting with the payment service API. If you do, the problem might be because of your incorrect credentials for your Stripe account. Note that you need to use the Stripe test API keys instead of the real ones.
Interact with the Payment Service
First, try to create the payment as a regular user would do.
Open your Postman application and set up the `POST` request with information like below:
– {{BASE_URL}} : http://localhost:8080
– METHOD : POST
– PATH : {{BASE_URL}}/payment_intents
– x-www-form-urlencoded as : `currency` is EUR, `amount` is 5000, and `payment_source` is pm_card_visa
Run the request, you should be able to see the `response_status_code` of the API is 200 with response content look like below:
After the payment is successfully created, the payment service will check whether that payment is valid or not. So make the API request check the payment. Below is the API request information for this API.
– {{BASE_URL}} : http://localhost:8080
– {{PAYMENT_ID}} : the value of `gateway_reference` you got from the create payment API
– METHOD : POST
– PATH : {{BASE_URL}}/payment_intents/{{PAYMENT_ID}}/confirm
– x-www-form-urlencoded as : `currency` is EUR
Run the request, you should be able to see the `response_status_code` of the API is 200 with response content look like below:
Now that you have successfully built the payment service for the app, it’s time to build the notification service.
Notification Service
The notification service will be responsible for sending the message to a Slack channel after the notification service receives the confirmed message from the payment service via Solace broker. When the customer service team receives the message in the Slack channel, they will move on to prepare the product items that customers have bought and ship to the customers.
Set up The Slack Application
To send messages to the Slack channel, youneed to create a Slack application and the bot token that got the authorization scope for sending messages. Details of how to create a Slack application and generate a bot token can refer to this doc from Slack.
Set the bot token
The Slack page that shows the bot token should look like the below:
Set the authorization scope
Scrolling down the page a little bit, you can set up the authorization scope. Below is my bot token authorization scope.
Note: For the notification service to work, you only need to set the authorization scope named chat:write
. Other scopes are not needed.
Find the channel id of your Slack channel
In order to send the message to the Slack channel that the customer service team is using, you need to have the channel_id
of it. The easiest way to do it would be to open the Slack channel using a browser and get the channel id from the Slack URL.
For my channel, the channel id is `C03EJ6VUTKL`, placed at the end of the Slack URL.
That’s it for the Slack setup. Go ahead and run the notification service, for now, to make sure the message is sent to the Slack channel.
Run the Notification Service
Now you need to clone code from this GitHub repository and check out the `core` branch.
bash cd .. git clone https://github.com/cuongld2/notification-service.git cd notification-service git checkout core export BOT_TOKEN=${your_bot_token} export CHANNEL_ID=${your_slack_channel_id} # Download the dependency for working with Slack API go mod download # Clean up the project go mod tidy # Run the service go run main.go
After running these commands, you should be able to receive the message in your Slack channel with the content “Hello Customer Service Team”.
Now that the payment service and notification are up and running, implement the EDA so that when the user’s payment is confirmed, a new message will be sent to the Slack channel.
Set up the Solace Event Broker Service
Details for how to set up the Solace broker service can be found in this doc. Below is the Solace page that shows the status of my Solace broker named “payment-broker”:
To connect with the broker service from the payment service and notification service, you need to know the broker service’s TransportLayerPropertyHost
, ServicePropertyVPNName
, AuthenticationPropertySchemeBasicUserName
, and AuthenticationPropertySchemeBasicPassword
. To do that, you need to go click on the `Connect` tab from the status page above. You should be able to see a similar screen like below which shows you a bunch of options to connect with Solace Broker.
From this page, choose option “Solace Message”, then choose “Solace Go API” for the client library.
Skip the “Get API” and “Learn more” steps, for now, and click on “Connect to service” tab.
Copy the values for the broker service’s TransportLayerPropertyHost
, ServicePropertyVPNName
, AuthenticationPropertySchemeBasicUserName
, and AuthenticationPropertySchemeBasicPassword
to somewhere safe. You will use these values later on to connect to the broker service.
That’s it for setting up Solace broker. Now it’s time to send messages from payment service to the Solace broker via the Solace topic.
Adding Step to Send Messages from Payment Service to Broker Service
First, go to the folder that contains the code for payment service, then add solace-messaging library to the `go.mod` file.
bash cd .. cd shop-service nano go.mod
Add this dependency to the require
code block. You need this Solace messaging dependency to connect the Solace broker and send a message to it.
bash solace.dev/go/messaging v1.1.0
Open the file named intentconfirm.go
in folder payment/intent/confirm
.
bash cd payment/intent/confirm nano intentconfirm.go
Delete the current content and add the below content to it.
go package apppaymentintentconfirm import ( "errors" "fmt" "math/rand" "os" "time" appconfig "shopping-service.com/m/config" appcurrency "shopping-service.com/m/currency" apperror "shopping-service.com/m/error" apppaymentintent "shopping-service.com/m/payment/intent" "solace.dev/go/messaging" "solace.dev/go/messaging/pkg/solace/config" "solace.dev/go/messaging/pkg/solace/message" "solace.dev/go/messaging/pkg/solace/resource" "github.com/stripe/stripe-go" "github.com/stripe/stripe-go/paymentintent" ) // Define Topic Prefix const TopicPrefix = "events/payment-service" func MessageHandler(message message.InboundMessage) { fmt.Printf("Message Dump %s \n", message) } func getEnv(key, def string) string { if val, ok := os.LookupEnv(key); ok { return val } return def } func randomString(length int) string { rand.Seed(time.Now().UnixNano()) b := make([]byte, length) rand.Read(b) return fmt.Sprintf("%x", b)[:length] } // Confirm gets the intent id from c Stripe account and confirm it func Confirm(id string, c appcurrency.Currency) (apppaymentintent.Intent, error) { if id == "" || c == nil { return nil, errors.New("impossible to confirm the payment intent without required parameters") } sck, e := appconfig.GetStripeAPIConfigByCurrency(c.GetISO4217()) if e != nil { return nil, e } stripe.Key = sck.GetSK() intent, e := paymentintent.Confirm(id, nil) if e != nil { m, es := apperror.GetStripeErrorMessage(e) if es == nil { return nil, errors.New(m) } return nil, e } // Send message to Solace broker // Configuration parameters brokerConfig := config.ServicePropertyMap{ config.TransportLayerPropertyHost: getEnv("TransportLayerPropertyHost", "tcps://"), config.ServicePropertyVPNName: getEnv("ServicePropertyVPNName", "brokername"), config.AuthenticationPropertySchemeBasicUserName: getEnv("AuthenticationPropertySchemeBasicUserName", "clientName"), config.AuthenticationPropertySchemeBasicPassword: getEnv("AuthenticationPropertySchemeBasicPassword", "password"), } messagingService, err := messaging.NewMessagingServiceBuilder().FromConfigurationProvider(brokerConfig).WithTransportSecurityStrategy(config.NewTransportSecurityStrategy().WithoutCertificateValidation()). Build() if err != nil { panic(err) } // Connect to the messaging serice if err := messagingService.Connect(); err != nil { panic(err) } fmt.Println("Connected to the broker? ", messagingService.IsConnected()) // Build a Direct Message Publisher directPublisher, builderErr := messagingService.CreateDirectMessagePublisherBuilder().Build() if builderErr != nil { panic(builderErr) } startErr := directPublisher.Start() if startErr != nil { panic(startErr) } fmt.Println("Direct Publisher running? ", directPublisher.IsRunning()) // Prepare outbound message payload and body messageBody := "Payment intent confirmed has id is : " messageBuilder := messagingService.MessageBuilder(). WithProperty("application", "samples"). WithProperty("language", "go") println("Subscribe to topic ", TopicPrefix+"/>") productId := randomString(5) id_value := randomString(6) if directPublisher.IsReady() { message, err := messageBuilder.BuildWithStringPayload(messageBody + id) if err != nil { panic(err) } publishErr := directPublisher.Publish(message, resource.TopicOf(TopicPrefix+"/"+productId+"/"+c.GetISO4217()+"/"+"pm_card_visa/"+id_value+"/")) if publishErr != nil { panic(publishErr) } } return apppaymentintent.FromStripeToAppIntent(*intent), nil }
In the code above, there are steps to connect and send messages from the payment service to the broker service after the payment is confirmed. Below are the lines of code to send a message to the broker service:
go messageBody := "Payment intent confirmed has id is : " messageBuilder := messagingService.MessageBuilder(). WithProperty("application", "samples"). WithProperty("language", "go") println("Subscribe to topic ", TopicPrefix+"/>") productId := randomString(5) paymentId := randomString(6) if directPublisher.IsReady() { message, err := messageBuilder.BuildWithStringPayload(messageBody + id) if err != nil { panic(err) } publishErr := directPublisher.Publish(message, resource.TopicOf(TopicPrefix+"/"+productId+"/"+c.GetISO4217()+"/"+"pm_card_visa/"+paymentId+"/")) if publishErr != nil { panic(publishErr) } }
Here the message is sent with the content “Payment intent confirmed has id is: ${id_value}” to the topic with URL is events/payment-service/{productId}/{currency}/pm_card_visa/{paymentId}
.
Notes: Later on in the notification service, you will filter the topic subscription to match the topic path that the payment service publishes. This is because, in a real-world project, the notification service will receive a lot of messages from the broker service, so it needs to have a matching mechanism to send the correct notification to the correct Slack channel. To do so, you will leverage Solace’s wildcards for efficient message filtering at the broker level and avoid straining the Notification service with traffic that it doesn’t need. Read more about Solace wildcards here.
Now, bring up the payment service with the new updated code above. Remember to export the environment variables for TransportLayerPropertyHost
, ServicePropertyVPNName
, AuthenticationPropertySchemeBasicUserName
, and AuthenticationPropertySchemeBasicPassword
of the broker service you already saved.
bash cd ../../.. export TransportLayerPropertyHost=${your_transport_layer_value} export ServicePropertyVPNName=${your_service_property_value} export AuthenticationPropertySchemeBasicUserName=${your_authen_user_name_value} export AuthenticationPropertySchemeBasicPassword=${your_authen_password_value} # Download the new dependencies for solace messaging go mod download # Clean the project go mod tidy # Bring up the notification service go run main.go -config=config.json
After running these commands, the payment service should be successfully up and running. The next step is to subscribe to messages sent from the payment service via Solace broker.
Adding Step to Receive Messages from Payment Service via Broker Service
First, you need to add the solace messaging dependency to `go.mod` file in the notification service code.
bash cd .. cd notification-service nano go.mod
Adding the line for solace messaging dependency to go.mod
file as below:
bash require solace.dev/go/messaging v1.1.0
The next step would be removing the current content of main.go
file.
bash nano main.go # Remove the content from the file now
Then adding these lines to it.
go package main import ( "fmt" "os" "github.com/slack-go/slack" "solace.dev/go/messaging" "solace.dev/go/messaging/pkg/solace/config" "solace.dev/go/messaging/pkg/solace/message" "solace.dev/go/messaging/pkg/solace/resource" ) // Define Topic Prefix const TopicPrefix = "events/payment-service" func MessageHandlerEuro(message message.InboundMessage) { var messageBody string if payload, ok := message.GetPayloadAsString(); ok { messageBody = payload } else if payload, ok := message.GetPayloadAsBytes(); ok { messageBody = string(payload) } fmt.Printf("Received Message Body %s \n", messageBody) api := slack.New(getEnv("BOT_TOKEN", "token")) api.PostMessage(getEnv("CHANNEL_ID", "channel_id"), slack.MsgOptionText("A new user bought a product using card visa with currency is EURO", false)) api.PostMessage(getEnv("CHANNEL_ID", "channel_id"), slack.MsgOptionText(messageBody, false)) } func MessageHandlerUsd(message message.InboundMessage) { var messageBody string if payload, ok := message.GetPayloadAsString(); ok { messageBody = payload } else if payload, ok := message.GetPayloadAsBytes(); ok { messageBody = string(payload) } fmt.Printf("Received Message Body %s \n", messageBody) api := slack.New(getEnv("BOT_TOKEN", "token")) api.PostMessage(getEnv("CHANNEL_ID", "channel_id"), slack.MsgOptionText("A new user bought a product using card visa with currency is USD", false)) api.PostMessage(getEnv("CHANNEL_ID", "channel_id"), slack.MsgOptionText(messageBody, false)) } func getEnv(key, def string) string { if val, ok := os.LookupEnv(key); ok { return val } return def } func main() { // Configuration parameters brokerConfig := config.ServicePropertyMap{ config.TransportLayerPropertyHost: getEnv("TransportLayerPropertyHost", "tcps://"), config.ServicePropertyVPNName: getEnv("ServicePropertyVPNName", "brokername"), config.AuthenticationPropertySchemeBasicUserName: getEnv("AuthenticationPropertySchemeBasicUserName", "clientName"), config.AuthenticationPropertySchemeBasicPassword: getEnv("AuthenticationPropertySchemeBasicPassword", "password"), } messagingService, err := messaging.NewMessagingServiceBuilder().FromConfigurationProvider(brokerConfig).WithTransportSecurityStrategy(config.NewTransportSecurityStrategy().WithoutCertificateValidation()). Build() if err != nil { panic(err) } // Connect to the messaging service if err := messagingService.Connect(); err != nil { panic(err) } fmt.Println("Connected to the broker? ", messagingService.IsConnected()) // Build a Direct Message Receiver directReceiver, err := messagingService.CreateDirectMessageReceiverBuilder(). WithSubscriptions(resource.TopicSubscriptionOf(TopicPrefix + "/*/EUR/pm_card_visa/>")). Build() if err != nil { panic(err) } // Start Direct Message Receiver if err := directReceiver.Start(); err != nil { panic(err) } fmt.Println("Direct Receiver running? ", directReceiver.IsRunning()) // Build a Direct Message Receiver anotherDirectReceiver, err := messagingService.CreateDirectMessageReceiverBuilder(). WithSubscriptions(resource.TopicSubscriptionOf(TopicPrefix + "/*/USD/pm_card_visa/>")). Build() if err != nil { panic(err) } // Start another Direct Message Receiver if err := anotherDirectReceiver.Start(); err != nil { panic(err) } fmt.Println("Direct Receiver running? ", anotherDirectReceiver.IsRunning()) for 1 != 0 { if regErr := directReceiver.ReceiveAsync(MessageHandlerEuro); regErr != nil { panic(regErr) } if regErr := anotherDirectReceiver.ReceiveAsync(MessageHandlerUsd); regErr != nil { panic(regErr) } } }
In the above code, you added steps to receive a message from the Solace broker service on two receivers. One receiver listens to the topic with the path containing `EUR` currency: resource.TopicSubscriptionOf(TopicPrefix + "/*/EUR/pm_card_visa/>")
. The other listens to the topic, which has a path containing `USD` currency: resource.TopicSubscriptionOf(TopicPrefix + "/*/USD/pm_card_visa/>")
. You can achieve this thanks to the filtering functionality support that Solace provides.
go // Build a Direct Message Receiver directReceiver, err := messagingService.CreateDirectMessageReceiverBuilder(). WithSubscriptions(resource.TopicSubscriptionOf(TopicPrefix + "/*/EUR/pm_card_visa/>")). Build() if err != nil { panic(err) } // Start Direct Message Receiver if err := directReceiver.Start(); err != nil { panic(err) } fmt.Println("Direct Receiver running? ", directReceiver.IsRunning()) // Build a Direct Message Receiver anotherDirectReceiver, err := messagingService.CreateDirectMessageReceiverBuilder(). WithSubscriptions(resource.TopicSubscriptionOf(TopicPrefix + "/*/USD/pm_card_visa/>")). Build() if err != nil { panic(err) } // Start another Direct Message Receiver if err := anotherDirectReceiver.Start(); err != nil { panic(err) } fmt.Println("Direct Receiver running? ", anotherDirectReceiver.IsRunning())
Alright, bring up the notification again. Remember to export the environment variables for TransportLayerPropertyHost
, ServicePropertyVPNName
, AuthenticationPropertySchemeBasicUserName
, and AuthenticationPropertySchemeBasicPassword
of the broker service you already saved.
bash export TransportLayerPropertyHost=${your_transport_layer_value} export ServicePropertyVPNName=${your_service_property_value} export AuthenticationPropertySchemeBasicUserName=${your_authen_user_name_value} export AuthenticationPropertySchemeBasicPassword=${your_authen_password_value} # Run the notification service go run main.go
Test the Notification Sent to Slack when the Payment is Confirmed.
Now that the payment service and notification service are implemented with steps for sending and receiving messages via Solace broker, it’s time to see if it works.
First, create a new payment using Postman with currency is `EUR`.
A new payment is created and its `gateway_reference` value is “pi_3M5ZZxDjuAE42eg03KlcuhYY”. Use this value for the confirm payment API.
You should successfully confirm the payment intent. Check the Slack channel of the customer service team to confirm that these messages were received.
If the notification in Slack channel for the new order using the `EUR` currency was received, check to see if the notification using `USD` currency was received or not.
To do that, create a new payment with `USD` currency.
Check the message in the Slack channel – you should see the new message that matches with the payment by `USD` currency.
Conclusion
And that’s how to build an event-driven application in Go using Solace PubSub+ Platform. EDA enables applications to easily scale up to meet the workload generated by the growing number of users. Moreover, it helps to decouple the services so that software teams can mitigate the risk of single-point-of-failure. However, implementing EDA might be complicated and requires a lot of work from software developers to efficiently design the services and how they communicate with each other. Another thing that makes implementing an event-driven application even harder is setting up the broker service for the app so that the broker service is able to scale up and handle a large number of requests from multiple services communicating with each other. With Solace PubSub+, software developers are free of the need to set up and maintain the broker service, so you only need to focus on designing and implementing the event-driven architecture for your services.