DEV Community

Cover image for Streaming Tweets with Go
Alex Fallenstedt
Alex Fallenstedt

Posted on

Streaming Tweets with Go

Building with free APIs is a great way to teach yourself new skills in languages you like. I’ve always found APIs as an underrated way to learn something new. Building with APIs brings challenges that force you to learn new parts of programming that video tutorials can not do.

Twitter’s API’s filtered stream endpoint allows you to filter the real-time stream of public Tweets. You can tap into twitter discussions by filtering tweets for specific attributes. You can find the latest job postings, monitor weather events, or keep on top of trends.

In this article I will discuss how to create twitter rules and manage a stream with my open source library twitterstream. This library was built for my project findtechjobs so I could find the latest tech jobs posted on twitter.

If you want a complete code example to get started, head over to the examples on twitterstream

Where do I start?

The first step is to create an app on Twitter Developers and obtain a set of consumer keys. One you have an API key and an API secret key, you can generate an access token with twitterstream

Generate an Access Token

We can use twitterstream to generate an access token. This access token will be used to authenticate all network requests going forward. In the code below, we make a network request to twitter’s oauth2/token endpoint with the The 'Basic' HTTP Authentication Scheme. Then we create an instance of twitterstream with our access token.

    tok, err := twitterstream.NewTokenGenerator().SetApiKeyAndSecret("YOUR_KEY", "YOUR_SECRET_KEY").RequestBearerToken()
// Create an instance of twitter api
    api := twitterstream.NewTwitterStream(tok.AccessToken)

Enter fullscreen mode Exit fullscreen mode

Set up Streaming Rules

Streaming rules make your stream deliver relevant information. The rules match a variety of twitter attributes such as message keywords, hashtags, and URLs. Creating great rules is fundamental to having a successful twitter stream. It’s important to continue refining your rules as you stream so you can harvest relevant information.

Let’s create a stream for software engineer job postings with twitterstream. A valid job posting tweet should should be:

  • Posted in the english language
  • Not a retweet
  • Not a reply to another tweet
  • Contain the word “hiring”
  • And contain the words “software developer” or “software engineer”

The twitterstream package makes building rules easy. We can use a NewRuleBuilder to create as many rules as the Twitter API allows for our consumer keys.

rules := twitterstream.NewRuleBuilder().
        AddRule("lang:en -is:retweet -is:quote hiring (software developer OR software engineer)", "hiring software role").
            Build()

res, err := api.Rules.Create(rules, false)
Enter fullscreen mode Exit fullscreen mode

The first part is using twitterstream to create a NewRuleBuilder.

We pass in two arguments when we add our rule with AddRule. The first is a long string with many operators. Successive operators with a space between them will result in boolean "AND" logic, meaning that Tweets will match only if both conditions are met. For example cats dogs will match tweets that contain the words “cats” and “dogs”. The second argument for AddRule is the tag label. This is a free-form text you can use to identify the rules that matched a specific Tweet in the streaming response. Tags can be the same across rules.

Let’s focus on the first argument. Each operator does something unique:

  • The first is the single lang:en which is BCP 47 language identifier. This filters the stream for tweets posted in the English language. You can only use a single lang operator in a rule, and it must be used with a conjunction.

  • Then we exclude retweets with -is:retweet. We use NOT logic (negation) by including a minus sign in front of our operator. The negation can be applied to words too. For example, cat #meme -grumpy will match tweets with the word cat, #meme, and do not include the word “grumpy”.

  • We also exclude quote tweets with -is:quote. Quote tweets are tweets with comments, and I’ve found this operator very useful. When I was building findtechjobs.io, I encountered a lot of people retweeting an article about automated hiring with their opinion. These quote tweets cluttered my dataset with unrelated job postings.

  • I then narrow my stream of tweets to words that include hiring. People who tweet about jobs would say “My team is hiring…”, or “StartupCo is hiring…”.

  • Finally (software developer OR software engineer), is a grouping of operators combined with an OR logic. Tweets will match if the tweet contains either of these words.

After we build our rules, we create them with api.Rules.Create. If you want to delete your rules, you can use api.Rules.Delete with the ID of each rule you currently have. You can find your current rules with api.Rules.Get.

You can learn more about rule operators here. Additionally, the endpoint that creates the rules is documented here.

Set the Unmarshal Hook

We need to create our own struct for our tweets so we can unmarshal the tweet well. Twitter’s Filtered Stream endpoint allows us to fetch additional information for each tweet (more on this later). To allow us to find this data easily, we need to create a struct that will represent our data model.

type StreamDataExample struct {
    Data struct {
        Text      string    `json:"text"`
        ID        string    `json:"id"`
        CreatedAt time.Time `json:"created_at"`
        AuthorID  string    `json:"author_id"`
    } `json:"data"`
    Includes struct {
        Users []struct {
        ID       string `json:"id"`
        Name     string `json:"name"`
        Username string `json:"username"`
        } `json:"users"`
    } `json:"includes"`
    MatchingRules []struct {
        ID  string `json:"id"`
        Tag string `json:"tag"`
    } `json:"matching_rules"`
}
Enter fullscreen mode Exit fullscreen mode

Every tweet that is streamed is returned as a []bytes by default. We can turn our data into something usable by unmarshaling each tweet into the struct StreamDataExample. It’s important to set an unmarshal hook with SetUnmarshalHook so we can process []bytes in a goroutine safe way.

api.SetUnmarshalHook(func(bytes []byte) (interface{}, error) {
    data := StreamDataExample{}

    if err := json.Unmarshal(bytes, &data); err != nil {
        fmt.Printf("failed to unmarshal bytes: %v", err)
    }

    return data, err
})
Enter fullscreen mode Exit fullscreen mode

If you are uncertain what your data model will look like, you can always create a string from the slice of bytes.

api.SetUnmarshalHook(func(bytes []byte) (interface{}, error) {
    return string(bytes), nil
})
Enter fullscreen mode Exit fullscreen mode

Starting a Stream

After creating our streaming rules and unmarshal hook, we are ready to start streaming tweets.
By default, twitter returns a limited amount of information about each tweet when we stream. We can request additional information on each tweet with a stream expansion.

    streamExpansions := twitterstream.NewStreamQueryParamsBuilder().
        AddExpansion("author_id").
        AddTweetField("created_at").
        Build()

    // StartStream will start the stream
    err = api.StartStream(streamExpansions)
Enter fullscreen mode Exit fullscreen mode

We first create some stream expansions with a NewStreamQueryParamsBuilder. This builder will create query parameters to start our stream with. Here, we are adding two additional piece of information to each tweet

  • AddExpansion("author_id") will request the author’s id for each tweet streamed. This is useful if you are keeping track of users who are tweeting.
  • AddTweetField("created_at") will request the time the tweet was tweeted. This is useful if you need to sort tweets chronologically. You can learn more about the available stream expansions here

Then we start the stream with our expansions using api.StartStream. This method will start a long running GET request to twitter’s streaming endpoint. The request is parsed incrementally throughout the duration of the network request. If you are interested in learning more about how to consume streaming data from twitter, then you should read their documentation Consuming Streaming Data

Consuming the Stream

Each tweet that is processed in our long running GET request is sent to a go channel. We range over this channel to process each tweet and check for errors from twitter. The stream will stop when we invoke api.StopStream, then we skip the remaining part of the loop, return to the top and wait for aclose signal from the channel.

   // Start processing data from twitter after starting the stream
    for tweet := range api.GetMessages() {

        // Handle disconnections from twitter
        if tweet.Err != nil {
            fmt.Printf("got error from twitter: %v", tweet.Err)

            // Stop the stream and wait for the channel to close on the next iteration.
            api.StopStream()
            continue
        }
        result := tweet.Data.(StreamDataExample)

        // Here I am printing out the text.
        // You can send this off to a queue for processing.
        // Or do your processing here in the loop
        fmt.Println(result.Data.Text)
    }
Enter fullscreen mode Exit fullscreen mode

Twitter’s servers attempt to hold the stream connection indefinitely. The error from twitter is made available in the stream. Disconnections can occur from several possible reasons:

  • A streaming server is restarted on the Twitter side. This is usually related to a code deploy and should be generally expected and designed around.
  • Your account exceeded your daily/monthly quota of Tweets.
  • You have too many active redundant connections.
  • More disconnect reasons can be found here

Anticipating Disconnects from Twitter

It’s important to maintain the connection to Twitter as long as possible because missing relevant information in your stream can create poor datasets. It should be expected that disconnections will occur and reconnection logic be built to handle disconnections from twitter

We can build reconnection logic using twitterstream’s api and a defer statement. A full example of handling reconnects can be found here. Below is a snippet

// This will run forever
func initiateStream() {
    fmt.Println("Starting Stream")

    // Start the stream
    // And return the library's api
    api := fetchTweets()

    // When the loop below ends, restart the stream defer initiateStream()
    defer initateStream()

    // Start processing data from twitter
    for tweet := range api.GetMessages() {
        if tweet.Err != nil {

            fmt.Printf("got error from twitter: %v", tweet.Err)

            api.StopStream()
            continue
        }
        result := tweet.Data.(StreamDataExample)
        fmt.Println(result.Data.Text)
    }
    fmt.Println("Stopped Stream")
}

Enter fullscreen mode Exit fullscreen mode

After we have started the stream and before we start processing the tweets, we defer the method itself. This will handle reconnections to twitter whenever the messages channel closes.

Final Thoughts

I hope you find this library useful in streaming tweets from twitter. Building this library was a challenge, and I learned how Go’s concurrency model works. If you liked this post, follow me on twitter as I document my journey in the software world.

Latest comments (1)

Collapse
 
tnvmadhav profile image
TnvMadhav⚡

Twitter API is such a gift and Twitter Query Scheme makes even more fun.

Thanks for sharing!