MongoDB Change Streams Implementation in Golang

What are Change Streams?

MongoDB Change Streams

  • Filterable : Applications can filter changes to receive only those change notifications they need.
  • Resumable : Change streams are resumable because each response comes with a resume token. Using the token, an application can start the stream where it left off (if it ever disconnects).
  • Ordered : Change notifications occur in the same order that the database was updated.
  • Durable : Change streams only include majority-committed changes. This is so every change seen by listening applications is durable in failure scenarios, such as electing a new primary.
  • Secure : Only users with rights to read a collection can create a change stream on that collection.
  • Easy to use : The syntax of the change streams API uses the existing MongoDB drivers and query language.

Experimenting with MongoDB Change Stream using Golang


Getting Started with MongoDB Streams: Golang Implementation

# export MongoDB URI

export MONGODB_URI="mongodb+srv://"

git clone
cd mongodb-change-events-go
go mod tidy
go run main.go

Demo Video

Code Walkthrough

  • Declaring struct returned by MongoDB Stream API
type DbEvent struct {
DocumentKey documentKey `bson:"documentKey"`
OperationType string `bson:"operationType"`
type documentKey struct {
ID primitive.ObjectID `bson:"_id"`
  • Declaring a struct that resembles to the collection
type result struct {
ID primitive.ObjectID `bson:"_id"`
UserID string `bson:"userID"`
DeviceType string `bson:"deviceType"`
GameState string `bson:"gameState"`
  • Connect to MongoDB
client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(os.Getenv("MONGODB_URI")))
if err != nil {
  • Set DB and Collection names
database := client.Database("summit-demo")
collection := database.Collection("bike-factory")
  • Create a change stream
changeStream, err := collection.Watch(context.TODO(), mongo.Pipeline{})
if err != nil {
  • Iterate over the change stream
for changeStream.Next(context.TODO()) {
change := changeStream.Current
fmt.Printf("%+v\n", change)
  • Detect change type (Insert or Update) and accordingly fetch the document
// Print out the document that was inserted or updated
if DbEvent.OperationType == "insert" || DbEvent.OperationType == "update" {
// Find the mongodb document based on the objectID
var result result
err := collection.FindOne(context.TODO(), DbEvent.DocumentKey).Decode(&result)
if err != nil {
// Convert changd MongoDB document from BSON to JSON
data, writeErr := bson.MarshalExtJSON(result, false, false)
if writeErr != nil {
// Print the changed document in JSON format
  • Close the change stream
if err := changeStream.Close(context.TODO()); err != nil {

Bonus : Function to Insert records to MongoDB collection

func insertRecord(collection *mongo.Collection) {
// pre-populated values for DeviceType and GameState
DeviceType := make([]string, 0)
DeviceType = append(DeviceType, "mobile","laptop","karan-board","tablet","desktop","smart-watch")
GameState := make([]string, 0)
GameState = append(GameState, "playing","paused","stopped","finished","failed")

// insert new records to MongoDB every 5 seconds
for {
item := result{
ID: primitive.NewObjectID(),
UserID: strconv.Itoa(rand.Intn(10000)),
DeviceType: DeviceType[rand.Intn(len(DeviceType))],
GameState: GameState[rand.Intn(len(GameState))],
_, err := collection.InsertOne(context.TODO(), item)
if err != nil {

time.Sleep(5 * time.Second)




Senior Principal Architect & Developer Evangelist @ Red Hat ♦ Loves K8s, OpenShift, Cloud-Native, Serverless, Hybrid-Multi-Cloud, Distributed Systems

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Karan Singh

Senior Principal Architect & Developer Evangelist @ Red Hat ♦ Loves K8s, OpenShift, Cloud-Native, Serverless, Hybrid-Multi-Cloud, Distributed Systems