MongoDB Change Streams Implementation in Golang

What are Change Streams?

Change streams is a near real-time ordered flow of information (stream) about any change to an item in a database, table/collection, or row of a table/document in a collection. For example, whenever any update (Insert, Update or Delete) occurs in a specific collection/table, the database triggers a change event with all the data which has been modified.

MongoDB Change Streams

MongoDB change streams provide a high-level API that can notify an application of changes to a MongoDB database, collection, or cluster, without using polling(which would come with much higher overhead). Characteristics of MongoDB Change Streams are:

  • Filterable : Applications can filter changes to receive only those change notifications they need.
  • Resumable : Change streams are resumable because each response comes with a resume token. Using the token, an application can start the stream where it left off (if it ever disconnects).
  • Ordered : Change notifications occur in the same order that the database was updated.
  • Durable : Change streams only include majority-committed changes. This is so every change seen by listening applications is durable in failure scenarios, such as electing a new primary.
  • Secure : Only users with rights to read a collection can create a change stream on that collection.
  • Easy to use : The syntax of the change streams API uses the existing MongoDB drivers and query language.

Experimenting with MongoDB Change Stream using Golang

Prerequisites

Getting Started with MongoDB Streams: Golang Implementation

# export MongoDB URI

export MONGODB_URI="mongodb+srv://admin:xxxxx@cluster0.ii90w.mongodb.net/myFirstDatabase?retryWrites=true&w=majority"

git clone https://github.com/ksingh7/mongodb-change-events-go.git
cd mongodb-change-events-go
go mod tidy
go run main.go

Demo Video

Here is my demo video recording that can help you understand this implementation.

Code Walkthrough

main.go file already has required guidelines in the form of comments. However, in this section, I will explain sections that I think are crucial

  • Declaring struct returned by MongoDB Stream API
type DbEvent struct {
DocumentKey documentKey `bson:"documentKey"`
OperationType string `bson:"operationType"`
}
type documentKey struct {
ID primitive.ObjectID `bson:"_id"`
}
  • Declaring a struct that resembles to the collection
type result struct {
ID primitive.ObjectID `bson:"_id"`
UserID string `bson:"userID"`
DeviceType string `bson:"deviceType"`
GameState string `bson:"gameState"`
}
  • Connect to MongoDB
client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(os.Getenv("MONGODB_URI")))
if err != nil {
panic(err)
}
  • Set DB and Collection names
database := client.Database("summit-demo")
collection := database.Collection("bike-factory")
  • Create a change stream
changeStream, err := collection.Watch(context.TODO(), mongo.Pipeline{})
if err != nil {
panic(err)
}
  • Iterate over the change stream
for changeStream.Next(context.TODO()) {
change := changeStream.Current
fmt.Printf("%+v\n", change)
}
  • Detect change type (Insert or Update) and accordingly fetch the document
// Print out the document that was inserted or updated
if DbEvent.OperationType == "insert" || DbEvent.OperationType == "update" {
// Find the mongodb document based on the objectID
var result result
err := collection.FindOne(context.TODO(), DbEvent.DocumentKey).Decode(&result)
if err != nil {
log.Fatal(err)
}
// Convert changd MongoDB document from BSON to JSON
data, writeErr := bson.MarshalExtJSON(result, false, false)
if writeErr != nil {
log.Fatal(writeErr)
}
// Print the changed document in JSON format
fmt.Println(string(data))
fmt.Println("")
}
  • Close the change stream
if err := changeStream.Close(context.TODO()); err != nil {
panic(err)
}

Bonus : Function to Insert records to MongoDB collection

func insertRecord(collection *mongo.Collection) {
// pre-populated values for DeviceType and GameState
DeviceType := make([]string, 0)
DeviceType = append(DeviceType, "mobile","laptop","karan-board","tablet","desktop","smart-watch")
GameState := make([]string, 0)
GameState = append(GameState, "playing","paused","stopped","finished","failed")

// insert new records to MongoDB every 5 seconds
for {
item := result{
ID: primitive.NewObjectID(),
UserID: strconv.Itoa(rand.Intn(10000)),
DeviceType: DeviceType[rand.Intn(len(DeviceType))],
GameState: GameState[rand.Intn(len(GameState))],
}
_, err := collection.InsertOne(context.TODO(), item)
if err != nil {
log.Fatal(err)
}

time.Sleep(5 * time.Second)
}
}

Summary

Hope this post gives you a better understanding of MongoDB Change Streams and how to use them in your application.

--

--

--

Senior Principal Architect & Developer Evangelist @ Red Hat ♦ Loves K8s, OpenShift, Cloud-Native, Serverless, Hybrid-Multi-Cloud, Distributed Systems

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Terraform Workspaces For Beginners

5 Step process for Cloud migration — Part 1

Clear Program .cache files on Ubuntu 20.10

Introduction to “If Condition” Activity in Azure Data Factory

Watch your instance variables!

Chingu Weekly Vol. 49

Speech Recognition in just 10 Lines of Python Code

#STACS Continuous Delivery with AWS CodePipeline and ECS Fargate

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Karan Singh

Karan Singh

Senior Principal Architect & Developer Evangelist @ Red Hat ♦ Loves K8s, OpenShift, Cloud-Native, Serverless, Hybrid-Multi-Cloud, Distributed Systems

More from Medium

Concurrency in Golang

Gopher image

API development with type-safety across the entire stack

Securing gRPC connection with SSL/TLS Certificate using Go

A First Look into Concurrency in Go