MongoDB Change Streams Implementation in Golang

What are Change Streams?

Change streams is a near real-time ordered flow of information (stream) about any change to an item in a database, table/collection, or row of a table/document in a collection. For example, whenever any update (Insert, Update or Delete) occurs in a specific collection/table, the database triggers a change event with all the data which has been modified.

MongoDB Change Streams

MongoDB change streams provide a high-level API that can notify an application of changes to a MongoDB database, collection, or cluster, without using polling(which would come with much higher overhead). Characteristics of MongoDB Change Streams are:

  • Filterable : Applications can filter changes to receive only those change notifications they need.
  • Resumable : Change streams are resumable because each response comes with a resume token. Using the token, an application can start the stream where it left off (if it ever disconnects).
  • Ordered : Change notifications occur in the same order that the database was updated.
  • Durable : Change streams only include majority-committed changes. This is so every change seen by listening applications is durable in failure scenarios, such as electing a new primary.
  • Secure : Only users with rights to read a collection can create a change stream on that collection.
  • Easy to use : The syntax of the change streams API uses the existing MongoDB drivers and query language.

Experimenting with MongoDB Change Stream using Golang

Prerequisites

Getting Started with MongoDB Streams: Golang Implementation

# export MongoDB URI

export MONGODB_URI="mongodb+srv://admin:xxxxx@cluster0.ii90w.mongodb.net/myFirstDatabase?retryWrites=true&w=majority"

git clone https://github.com/ksingh7/mongodb-change-events-go.git
cd mongodb-change-events-go
go mod tidy
go run main.go

Demo Video

Here is my demo video recording that can help you understand this implementation.

Code Walkthrough

main.go file already has required guidelines in the form of comments. However, in this section, I will explain sections that I think are crucial

  • Declaring struct returned by MongoDB Stream API
type DbEvent struct {
DocumentKey documentKey `bson:"documentKey"`
OperationType string `bson:"operationType"`
}
type documentKey struct {
ID primitive.ObjectID `bson:"_id"`
}
  • Declaring a struct that resembles to the collection
type result struct {
ID primitive.ObjectID `bson:"_id"`
UserID string `bson:"userID"`
DeviceType string `bson:"deviceType"`
GameState string `bson:"gameState"`
}
  • Connect to MongoDB
client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(os.Getenv("MONGODB_URI")))
if err != nil {
panic(err)
}
  • Set DB and Collection names
database := client.Database("summit-demo")
collection := database.Collection("bike-factory")
  • Create a change stream
changeStream, err := collection.Watch(context.TODO(), mongo.Pipeline{})
if err != nil {
panic(err)
}
  • Iterate over the change stream
for changeStream.Next(context.TODO()) {
change := changeStream.Current
fmt.Printf("%+v\n", change)
}
  • Detect change type (Insert or Update) and accordingly fetch the document
// Print out the document that was inserted or updated
if DbEvent.OperationType == "insert" || DbEvent.OperationType == "update" {
// Find the mongodb document based on the objectID
var result result
err := collection.FindOne(context.TODO(), DbEvent.DocumentKey).Decode(&result)
if err != nil {
log.Fatal(err)
}
// Convert changd MongoDB document from BSON to JSON
data, writeErr := bson.MarshalExtJSON(result, false, false)
if writeErr != nil {
log.Fatal(writeErr)
}
// Print the changed document in JSON format
fmt.Println(string(data))
fmt.Println("")
}
  • Close the change stream
if err := changeStream.Close(context.TODO()); err != nil {
panic(err)
}

Bonus : Function to Insert records to MongoDB collection

func insertRecord(collection *mongo.Collection) {
// pre-populated values for DeviceType and GameState
DeviceType := make([]string, 0)
DeviceType = append(DeviceType, "mobile","laptop","karan-board","tablet","desktop","smart-watch")
GameState := make([]string, 0)
GameState = append(GameState, "playing","paused","stopped","finished","failed")

// insert new records to MongoDB every 5 seconds
for {
item := result{
ID: primitive.NewObjectID(),
UserID: strconv.Itoa(rand.Intn(10000)),
DeviceType: DeviceType[rand.Intn(len(DeviceType))],
GameState: GameState[rand.Intn(len(GameState))],
}
_, err := collection.InsertOne(context.TODO(), item)
if err != nil {
log.Fatal(err)
}

time.Sleep(5 * time.Second)
}
}

Summary

Hope this post gives you a better understanding of MongoDB Change Streams and how to use them in your application.

--

--

--

Senior Principal Architect & Developer Evangelist @ Red Hat ♦ Loves K8s, OpenShift, Cloud-Native, Serverless, Hybrid-Multi-Cloud, Distributed Systems

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

How to root Elephone P9

Root LG Phone

Ansible Playbook for VyOS and BGP Routing

Configure Forced Tunneling on Azure

Kubernetes Windows Cluster Creation Steps Including Windows AMI Creation with Packer guide

What’s new in .NET Framework 5.0

C# Object-oriented Programming(OOP)

[Frequently asked Product Management Interview Question] What is your favorite software Product

Named Entity Recognition From Wikipedia Article Using Spacy

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Karan Singh

Karan Singh

Senior Principal Architect & Developer Evangelist @ Red Hat ♦ Loves K8s, OpenShift, Cloud-Native, Serverless, Hybrid-Multi-Cloud, Distributed Systems

More from Medium

Druid complex Lookup using other dimensions, with native query or Golang

Gin 101: Manage config files in Cloud Native way

Upload file using Go

Writing a simple WebApi with Golang and MsSql