TF-IDF using flambo

Posted July 22nd, 2014 by Muslim Baig in Clojure, Data, flambo, Analytics


flambo is a Clojure DSL for Spark created by the data team at Yieldbot. It allows you to create and manipulate Spark data structures using idiomatic Clojure. The following tutorial demonstrates typical flambo API usage and facilities by implementing the classic tf-idf algorithm.

The complete runnable file of the code presented in this tutorial is located under the flambo.example.tfidf namespace, under the flambo /test/flambo/example directory. We recommend you download flambo and follow along in your REPL.

What is tf-idf?

TF-IDF (term frequency-inverse document frequency) is a way to score the importance of terms in a document based on how frequently they appear across a collection of documents (corpus). The tf-idf weight of a term in a document is the product of its tf weight:

tf(t, d) = (number of times term t appears in document d) / (total number of terms in document d)

and its idf weight:

idf(t) = ln((total number of documents in corpus) / (1 + (number of documents with term t)))

Example Application Walkthrough

First, let’s start the REPL and load the namespaces we’ll need to implement our app:

lein repl
user=> (require '[flambo.api :as f])
user=> (require '[flambo.conf :as conf])

The flambo api and conf namespaces contain functions to access the Spark API and to create and modify Spark configuration objects, respectively.

Initializing Spark

flambo applications require a SparkContext object which tells Spark how to access a cluster. The SparkContext object requires a SparkConf object that encapsulates information about the application. We first build a spark configuration, c, then pass it to the flambo spark-context function which returns the requisite context object, sc:

user=> (def c (-> (conf/spark-conf)
                  (conf/master master)
                  (conf/app-name "tfidf")
                  (conf/set "spark.akka.timeout" "300")
                  (conf/set conf)
                  (conf/set-executor-env env)))
user=> (def sc (f/spark-context c))

master is a special “local” string that tells Spark to run our app in local mode. master can be a Spark, Mesos or YARN cluster URL, or any one of the special strings to run in local mode (see for formatting details).

The app-name flambo function is used to set the name of our application.

As with most distributed computing systems, Spark has a myriad of properties that control most application settings. With flambo you can either set these properties directly on a SparkConf object, e.g., (conf/set “spark.akka.timeout” “300”), or via a Clojure map, (conf/set conf). We set an empty map, (def conf {}), for illustration.

Similarly, we set the executor runtime environment properties either directly via key/value strings or by passing a Clojure map of key/value strings. conf/set-executor-env handles both.

Computing TF-IDF

Our example will use the following corpus:

user=> (def documents
	 [["doc1" "Four score and seven years ago our fathers brought forth on this continent a new nation"]
	  ["doc2" "conceived in Liberty and dedicated to the proposition that all men are created equal"]
	  ["doc3" "Now we are engaged in a great civil war testing whether that nation or any nation so"]
	  ["doc4" "conceived and so dedicated can long endure We are met on a great battlefield of that war"]])

where doc# is a unique document id.

We use the corpus and spark context to create a Spark resilient distributed dataset (RDD). There are two ways to create RDDs in flambo:

  • parallelizing an existing Clojure collection, as we’ll do now:
user=> (def doc-data (f/parallelize sc documents))
  • reading a dataset from an external storage system

We are now ready to start applying actions and transformations to our RDD; this is where flambo truly shines (or rather burns bright). It utilizes the powerful abstractions available in Clojure to reason about data. You can use Clojure constructs such as the threading macro -> to chain sequences of operations and transformations.

Term Frequency

To compute the term frequencies, we need a dictionary of the terms in each document filtered by a set of stopwords. We pass the RDD, doc-data, of [doc-id content] tuples to the flambo flat-map transformation to get a new, stopword filtered RDD of [doc-id term term-frequency doc-terms-count] tuples. This is the dictionary for our corpus.

flat-map transforms the source RDD by passing each tuple through a function. It is similar to map, but the output is a collection of 0 or more items which is then flattened. We use the flambo named function macro flambo.api/defsparkfn to define our Clojure function gen-docid-term-tuples:

user=> (f/defsparkfn gen-docid-term-tuples [doc-tuple]
         (let [[doc-id content] doc-tuple
               terms (filter #(not (contains? stopwords %))
                             (clojure.string/split content #" "))
               doc-terms-count (count terms)
               term-frequencies (frequencies terms)]
           (map (fn [term] [doc-id term (term-frequencies term) doc-terms-count])
                (distinct terms))))
user=> (def doc-term-seq (-> doc-data
                             (f/flat-map gen-docid-term-tuples)

Notice how we use pure Clojure in our Spark function definition to operate on and transform input parameters. We’re able to filter stopwords, determine the number of terms per document and the term-frequencies for each document, all from within Clojure. Once the Spark function returns, flat-map serializes the results back to an RDD for the next action/transformation.

This is flambo’s raison d’être. It handles all of the underlying serializations to/from the various Spark Java types, so you only need to define the sequence of operations you would like to perform on your data. That’s powerful.

Having constructed our dictionary we f/cache (or persist) the dataset in memory for future actions.

Recall term-frequency is defined as a function of the document id and term, tf(document, term). At this point we have an RDD of raw term frequencies, but we need normalized term frequencies. We use the flambo inline anonymous function macro, f/fn, to define an anonymous Clojure function to normalize the frequencies and map our doc-term-seq RDD of [doc-id term term-freq doc-terms-count] tuples to an RDD of key/value, [term [doc-id tf]], tuples. This new tuple format of the term-frequency RDD will be later used to join the inverse-document-frequency RDD and compute the final tf-idf weights.

user=> (def tf-by-doc (-> doc-term-seq
                          (f/map (f/fn [[doc-id term term-freq doc-terms-count]]
                                       [term [doc-id (double (/ term-freq doc-terms-count))]]))

Notice again how we were easily able to use the Clojure destructuring facilities on the arguments of our inline function to name parameters.

As before, we cache the results for future actions.

Inverse Document Frequency

In order to compute the inverse document frequencies, we need the total number of documents:

user=> (def num-docs (f/count doc-data))

and the number of documents that contain each term. The following step maps over the distinct [doc-id term term-freq doc-terms-count] tuples to count the documents associated with each term. This is combined with the total document count to get an RDD of [term idf] tuples:

user=> (defn calc-idf [doc-count]
         (f/fn [[term tuple-seq]]
           (let [df (count tuple-seq)]
             [term (Math/log (/ doc-count (+ 1.0 df)))])))
user=> (def idf-by-term (-> doc-term-seq
                            (f/group-by (f/fn [[_ term _ _]] term))
                            (f/map (calc-idf num-docs))


Now that we have both a term-frequency RDD of [term [doc-id tf]] tuples and an inverse-document-frequency RDD of [term idf] tuples, we perform the aforementioned join on the “terms” producing a new RDD of [term [[doc-id tf] idf]] tuples. Then, we map an inline Spark function to compute the tf-idf weight of each term per document returning our final RDD of [doc-id term tf-idf] tuples:

user=> (def tfidf-by-term (-> (f/join tf-by-doc idf-by-term)
                              (f/map (f/fn [[term [[doc-id tf] idf]]]
                                           [doc-id term (* tf idf)]))

We again cache the RDD for future actions.

Finally, to see the output of our example application we collect all the elements of our tf-idf RDD as a Clojure array, sort them by tf-idf weight, and for illustration print the top 10 to standard out:

user=> (->> tfidf-by-term
            ((partial sort-by last >))
            (take 10)
(["doc2" "created" 0.09902102579427793]
 ["doc2" "men" 0.09902102579427793]
 ["doc2" "Liberty" 0.09902102579427793]
 ["doc2" "proposition" 0.09902102579427793]
 ["doc2" "equal" 0.09902102579427793]
 ["doc3" "civil" 0.07701635339554948]
 ["doc3" "Now" 0.07701635339554948]
 ["doc3" "testing" 0.07701635339554948]
 ["doc3" "engaged" 0.07701635339554948]
 ["doc3" "whether" 0.07701635339554948])

You can also save the results to a text file via the flambo save-as-text-file function, or an HDFS sequence file via save-as-sequence-file, but we’ll leave those APIs for you to explore.


And that’s it, we’re done! We hope you found this tutorial of the flambo API useful and informative.

flambo is being actively improved, so you can expect more features as Spark continues to grow and we continue to support it. We’d love to hear your feedback on flambo.

Marceline's Instruments

Posted June 25th, 2014 by Homer Strong in Clojure, Data, Storm, Analytics


Last December Yieldbot open-sourced Marceline, our Clojure DSL for Storm’s Trident framework. We are excited to release our first major update to Marceline, version 0.2.0.

The primary additions in this release are wrappers for Storm’s built-in metrics system. Storm’s metrics API allows topologies to record and emit metrics. Read more on Storm metrics in the official documentation. We run production topologies instrumented with Marceline metrics and have found it to be stable; YMMV! Please file issues on GitHub if you encounter bugs or have ideas for how Marceline could be improved. See the Metrics section of the README for usage. Also note that Marceline’s metrics can be useful for any Clojure Storm topologies, either with vanilla Storm or Trident.

Marceline’s exposure of Storm metrics has been very useful for monitoring the behavior of Yieldbot’s topologies. Friction around instrumentation has been greatly reduced. Code smells are down. Metrics now entail fewer lines of code and less duplication. An additional architectural benefit is that dependencies on external services can be isolated to individual topology components. It is painless to add typical metrics while maintaining enough flexibility for custom metrics when necessary. We have designed Marceline’s metrics specifically with the goal to leverage Storm’s metrics API unobtrusively.

As Yieldbot’s backend scales it is increasingly crucial to monitor topologies. Simultaneously, new features require iterations on what quantities are monitored. While topology metrics are primarily interesting to developers, these metrics are often directly related to data-driven business concerns. Several of Yieldbot’s Key Performance Indicators (KPIs) are powered by Storm and Marceline, so the availability of a fantastic metrics API translates to greater transparency within the organization.

If you’re interested in such data engineering topics as this, check out some of the exciting careers at Yieldbot!

-- @strongh

Say Hello to Marceline

Posted December 16th, 2013 by Soren Macbeth in Storm, Clojure, Data, Analytics


Yieldbot is pleased to announce the public release of Marceline, A Clojure DSL for Trident.

Storm plays a central role in Yieldbot's real-time data processing systems. From data collection and ETL, to powering on-line machine learning algorithms, we rely heavily on Storm to process vast amount of data efficiently. Trident is a high level abstraction on top of Storm, analogous to Cascading for Hadoop. Trident, like Cascading is written is written in Java. This simply would not do.

Clojure, a lisp dialect which runs on the JVM, forms that base of the software stack for the data team at Yieldbot. We love Clojure because it allows us to quickly and interactively build our data processing systems and machine learning algorithms. Clojure gives us the REPL based development of a dynamic, functional language with the performance and stability of the JVM. With Marceline, we get the best of both worlds by being able to develop and test our Trident topologies in Clojure, uberjar them up and ship them off to our production environments.

Marceline is still young, but we have been running in production without issue. For more information about how to use it, including examples, please see the README on github. Special Thanks to Dan Herrera and Steven Surgnier for their help in testing, writing documentation and providing additional examples.

-- @sorenmacbeth

First Page 1 Last Page
More from Our Blog
Yieldbot's First Annual Super Bowl Intent Scorecard

Posted February 3rd, 2015 by Jonathan Mendez in intent, CTR

Yieldbot 2014 Review by the Numbers

Posted December 22nd, 2014 by Jonathan Mendez in

Rise of the Intelligent Publisher

Posted November 10th, 2014 by Jonathan Mendez in Media, CPC, Performance , Publishers, Data, Analytics, First Party, real-time

TF-IDF using flambo

Posted July 22nd, 2014 by Muslim Baig in Clojure, Data, flambo, Analytics

Marceline's Instruments

Posted June 25th, 2014 by Homer Strong in Clojure, Data, Storm, Analytics

View More

Yieldbot In the News
RTB’s Fatal Flaw: It’s too slow

From Digiday posted September 23rd, 2014 in Company News

Yieldbot Hands Publishers A New Way to Leverage Their First-Party Data

From Ad Exchanger posted September 23rd, 2014 in Company News

Yieldbot Raises $18 Million to Advance Search-Style Display Buying

From AdAge posted September 23rd, 2014 in Company News

Follow Us

Yieldbot In the News

RTB’s Fatal Flaw: It’s too slow

From Digiday posted September 23rd, 2014 in Company News

I have some bad news for real-time bidding. The Web is getting faster, and RTB is about to be left behind. Now, 120 milliseconds is becoming too long to make the necessary computations prior to page load that many of today’s systems have been built around.

Visit Site

Yieldbot Hands Publishers A New Way to Leverage Their First-Party Data

From Ad Exchanger posted September 23rd, 2014 in Company News

Yieldbot, whose technology looks at a user’s clickstream and search data in order to determine likeliness to buy, is extending its business to give publishers a new way to monetize their first-party data.

Visit Site

Yieldbot Raises $18 Million to Advance Search-Style Display Buying

From AdAge posted September 23rd, 2014 in Company News

Yieldbot, a New York based ad-tech company that lets advertisers buy display ads via search-style keywords, has raised a $18 million series B round of funding

Visit Site

Much Ado About Native Ads

From Digiday posted December 5th, 2013 in Company News

The most amazing thing about the Federal Trade Commission’s workshop about native advertising Wednesday morning is that it happened at all. As Yieldbot CEO Jonathan Mendez noted...

Visit Site

Pinterest Dominates Social Referrals, But Facebook Drives Higher Performance [Study]

From Marketing Land posted October 3rd, 2013 in Company News

Publishers in women’s programming verticals such as food and recipes, home and garden, style and health and wellness have found a deep, high volume source of referral traffic from Pinterest.

Visit Site

Pinterest Sends Your Site More Traffic, Study Says, but Maybe Not the Kind You Want

From Ad Age posted October 3rd, 2013 in Company News

Pinterest may have quickly arrived as a major source of traffic to many websites, but those visitors may click on the ads they see there less often than others.

Visit Site

From Our Blog

Yieldbot's First Annual Super Bowl Intent Scorecard

Posted February 3rd, 2015 by Jonathan Mendez in intent, CTR

Read More

Connect With Us

Where to Find Us

New York City

149 5th Ave.
Third Floor
New York, NY


1 Clock Tower Place
Suite 330
Maynard, MA


1033 SE Main St.
Suite #4
Portland, Oregon