MongoDB Change Streams Implementation in Golang

What are Change Streams?

MongoDB Change Streams

  • Filterable : Applications can filter changes to receive only those change notifications they need.
  • Resumable : Change streams are resumable because each response comes with a resume token. Using the token, an application can start the stream where it left off (if it ever disconnects).
  • Ordered : Change notifications occur in the same order that the database was updated.
  • Durable : Change streams only include majority-committed changes. This is so every change seen by listening applications is durable in failure scenarios, such as electing a new primary.
  • Secure : Only users with rights to read a collection can create a change stream on that collection.
  • Easy to use : The syntax of the change streams API uses the existing MongoDB drivers and query language.

Experimenting with MongoDB Change Stream using Golang

Prerequisites

Getting Started with MongoDB Streams: Golang Implementation

# export MongoDB URI

export MONGODB_URI="mongodb+srv://admin:xxxxx@cluster0.ii90w.mongodb.net/myFirstDatabase?retryWrites=true&w=majority"

git clone https://github.com/ksingh7/mongodb-change-events-go.git
cd mongodb-change-events-go
go mod tidy
go run main.go

Demo Video

Code Walkthrough

  • Declaring struct returned by MongoDB Stream API
type DbEvent struct {
DocumentKey documentKey `bson:"documentKey"`
OperationType string `bson:"operationType"`
}
type documentKey struct {
ID primitive.ObjectID `bson:"_id"`
}
  • Declaring a struct that resembles to the collection
type result struct {
ID primitive.ObjectID `bson:"_id"`
UserID string `bson:"userID"`
DeviceType string `bson:"deviceType"`
GameState string `bson:"gameState"`
}
  • Connect to MongoDB
client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(os.Getenv("MONGODB_URI")))
if err != nil {
panic(err)
}
  • Set DB and Collection names
database := client.Database("summit-demo")
collection := database.Collection("bike-factory")
  • Create a change stream
changeStream, err := collection.Watch(context.TODO(), mongo.Pipeline{})
if err != nil {
panic(err)
}
  • Iterate over the change stream
for changeStream.Next(context.TODO()) {
change := changeStream.Current
fmt.Printf("%+v\n", change)
}
  • Detect change type (Insert or Update) and accordingly fetch the document
// Print out the document that was inserted or updated
if DbEvent.OperationType == "insert" || DbEvent.OperationType == "update" {
// Find the mongodb document based on the objectID
var result result
err := collection.FindOne(context.TODO(), DbEvent.DocumentKey).Decode(&result)
if err != nil {
log.Fatal(err)
}
// Convert changd MongoDB document from BSON to JSON
data, writeErr := bson.MarshalExtJSON(result, false, false)
if writeErr != nil {
log.Fatal(writeErr)
}
// Print the changed document in JSON format
fmt.Println(string(data))
fmt.Println("")
}
  • Close the change stream
if err := changeStream.Close(context.TODO()); err != nil {
panic(err)
}

Bonus : Function to Insert records to MongoDB collection

func insertRecord(collection *mongo.Collection) {
// pre-populated values for DeviceType and GameState
DeviceType := make([]string, 0)
DeviceType = append(DeviceType, "mobile","laptop","karan-board","tablet","desktop","smart-watch")
GameState := make([]string, 0)
GameState = append(GameState, "playing","paused","stopped","finished","failed")

// insert new records to MongoDB every 5 seconds
for {
item := result{
ID: primitive.NewObjectID(),
UserID: strconv.Itoa(rand.Intn(10000)),
DeviceType: DeviceType[rand.Intn(len(DeviceType))],
GameState: GameState[rand.Intn(len(GameState))],
}
_, err := collection.InsertOne(context.TODO(), item)
if err != nil {
log.Fatal(err)
}

time.Sleep(5 * time.Second)
}
}

Summary

--

--

Senior Principal Architect & Developer Evangelist @ Red Hat ♦ Loves K8s, OpenShift, Cloud-Native, Serverless, Hybrid-Multi-Cloud, Distributed Systems

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Karan Singh

Senior Principal Architect & Developer Evangelist @ Red Hat ♦ Loves K8s, OpenShift, Cloud-Native, Serverless, Hybrid-Multi-Cloud, Distributed Systems