Streaming Tweets with Go

Dec 29, 2021

Building with free APIs is a great way to teach yourself new skills in languages you like. I’ve always found APIs as an underrated way to learn something new. Building with APIs brings challenges that force you to learn new parts of programming that video tutorials can not do.

Twitter’s API’s filtered stream endpoint allows you to filter the real-time stream of public Tweets. You can tap into twitter discussions by filtering tweets for specific attributes. You can find the latest job postings, monitor weather events, or keep on top of trends.

In this article I will discuss how to create twitter rules and manage a stream with my open source library twitterstream. This library was built for my project findtechjobs so I could find the latest tech jobs posted on twitter.

If you want a complete code example to get started, head over to the examples on twitterstream

twitter phone

Where do I start?

The first step is to create an app on Twitter Developers and obtain a set of consumer keys. One you have an API key and an API secret key, you can generate an access token with twitterstream

Generate an Access Token

We can use twitterstream to generate an access token. This access token will be used to authenticate all network requests going forward. In the code below, we make a network request to twitter’s oauth2/token endpoint with the The ‘Basic’ HTTP Authentication Scheme. Then we create an instance of twitterstream with our access token.

	tok, err := twitterstream.NewTokenGenerator().SetApiKeyAndSecret("YOUR_KEY", "YOUR_SECRET_KEY").RequestBearerToken()
// Create an instance of twitter api
	api := twitterstream.NewTwitterStream(tok.AccessToken)

Set up Streaming Rules

Streaming rules make your stream deliver relevant information. The rules match a variety of twitter attributes such as message keywords, hashtags, and URLs. Creating great rules is fundamental to having a successful twitter stream. It’s important to continue refining your rules as you stream so you can harvest relevant information.

Let’s create a stream for software engineer job postings with twitterstream. A valid job posting tweet should should be:

The twitterstream package makes building rules easy. We can use a NewRuleBuilder to create as many rules as the Twitter API allows for our consumer keys.

rules := twitterstream.NewRuleBuilder().
		AddRule("lang:en -is:retweet -is:quote hiring (software developer OR software engineer)", "hiring software role").
            Build()

res, err := api.Rules.Create(rules, false)

The first part is using twitterstream to create a NewRuleBuilder.

We pass in two arguments when we add our rule with AddRule. The first is a long string with many operators. Successive operators with a space between them will result in boolean “AND” logic, meaning that Tweets will match only if both conditions are met. For example cats dogs will match tweets that contain the words “cats” and “dogs”. The second argument for AddRule is the tag label. This is a free-form text you can use to identify the rules that matched a specific Tweet in the streaming response. Tags can be the same across rules.

Let’s focus on the first argument. Each operator does something unique:

After we build our rules, we create them with api.Rules.Create. If you want to delete your rules, you can use api.Rules.Delete with the ID of each rule you currently have. You can find your current rules with api.Rules.Get.

You can learn more about rule operators here. Additionally, the endpoint that creates the rules is documented here.

Set the Unmarshal Hook

We need to create our own struct for our tweets so we can unmarshal the tweet well. Twitter’s Filtered Stream endpoint allows us to fetch additional information for each tweet (more on this later). To allow us to find this data easily, we need to create a struct that will represent our data model.

type StreamDataExample struct {
    Data struct {
        Text      string    `json:"text"`
        ID        string    `json:"id"`
        CreatedAt time.Time `json:"created_at"`
        AuthorID  string    `json:"author_id"`
    } `json:"data"`
    Includes struct {
        Users []struct {
        ID       string `json:"id"`
        Name     string `json:"name"`
        Username string `json:"username"`
        } `json:"users"`
    } `json:"includes"`
    MatchingRules []struct {
        ID  string `json:"id"`
        Tag string `json:"tag"`
    } `json:"matching_rules"`
}

Every tweet that is streamed is returned as a []bytes by default. We can turn our data into something usable by unmarshaling each tweet into the struct StreamDataExample. It’s important to set an unmarshal hook with SetUnmarshalHook so we can process []bytes in a goroutine safe way.

api.SetUnmarshalHook(func(bytes []byte) (interface{}, error) {
    data := StreamDataExample{}

    if err := json.Unmarshal(bytes, &data); err != nil {
        fmt.Printf("failed to unmarshal bytes: %v", err)
    }

    return data, err
})

If you are uncertain what your data model will look like, you can always create a string from the slice of bytes.

api.SetUnmarshalHook(func(bytes []byte) (interface{}, error) {
	return string(bytes), nil
})

Starting a Stream

After creating our streaming rules and unmarshal hook, we are ready to start streaming tweets. By default, twitter returns a limited amount of information about each tweet when we stream. We can request additional information on each tweet with a stream expansion.

    streamExpansions := twitterstream.NewStreamQueryParamsBuilder().
        AddExpansion("author_id").
        AddTweetField("created_at").
        Build()

    // StartStream will start the stream
    err = api.StartStream(streamExpansions)

We first create some stream expansions with a NewStreamQueryParamsBuilder. This builder will create query parameters to start our stream with. Here, we are adding two additional piece of information to each tweet

Then we start the stream with our expansions using api.StartStream. This method will start a long running GET request to twitter’s streaming endpoint. The request is parsed incrementally throughout the duration of the network request. If you are interested in learning more about how to consume streaming data from twitter, then you should read their documentation Consuming Streaming Data

Consuming the Stream

Each tweet that is processed in our long running GET request is sent to a go channel. We range over this channel to process each tweet and check for errors from twitter. The stream will stop when we invoke api.StopStream, then we skip the remaining part of the loop, return to the top and wait for aclose signal from the channel.

   // Start processing data from twitter after starting the stream
    for tweet := range api.GetMessages() {

        // Handle disconnections from twitter
        if tweet.Err != nil {
            fmt.Printf("got error from twitter: %v", tweet.Err)

            // Stop the stream and wait for the channel to close on the next iteration.
            api.StopStream()
            continue
        }
        result := tweet.Data.(StreamDataExample)

        // Here I am printing out the text.
        // You can send this off to a queue for processing.
        // Or do your processing here in the loop
        fmt.Println(result.Data.Text)
    }

Twitter’s servers attempt to hold the stream connection indefinitely. The error from twitter is made available in the stream. Disconnections can occur from several possible reasons:

Anticipating Disconnects from Twitter

It’s important to maintain the connection to Twitter as long as possible because missing relevant information in your stream can create poor datasets. It should be expected that disconnections will occur and reconnection logic be built to handle disconnections from twitter

We can build reconnection logic using twitterstream’s api and a defer statement. A full example of handling reconnects can be found here. Below is a snippet

// This will run forever
func initiateStream() {
	fmt.Println("Starting Stream")

    // Start the stream
	// And return the library's api
	api := fetchTweets()

	// When the loop below ends, restart the stream defer initiateStream()
    defer initateStream()

	// Start processing data from twitter
	for tweet := range api.GetMessages() {
		if tweet.Err != nil {

			fmt.Printf("got error from twitter: %v", tweet.Err)

			api.StopStream()
			continue
		}
		result := tweet.Data.(StreamDataExample)
		fmt.Println(result.Data.Text)
	}
	fmt.Println("Stopped Stream")
}

After we have started the stream and before we start processing the tweets, we defer the method itself. This will handle reconnections to twitter whenever the messages channel closes.

Final Thoughts

I hope you find this library useful in streaming tweets from twitter. Building this library was a challenge, and I learned how Go’s concurrency model works. If you liked this post, follow me on twitter as I document my journey in the software world.