Project Name | Stars | Downloads | Repos Using This | Packages Using This | Most Recent Commit | Total Releases | Latest Release | Open Issues | License | Language |
---|---|---|---|---|---|---|---|---|---|---|
Akka Streams Example | 362 | 8 months ago | 2 | Scala | ||||||
Been | 122 | 8 years ago | 1 | bsd-3-clause | Python | |||||
Life stream collector. | ||||||||||
Jsonf | 65 | 8 years ago | May 03, 2015 | 1 | mit | Go | ||||
A Unix-y utility for formatting JSON in a more stream-friendly way | ||||||||||
Reddit_sse_stream | 29 | 3 years ago | 8 | mit | Python | |||||
A Server Side Event stream to deliver Reddit comments and submissions in near real-time to a client. | ||||||||||
Snookey | 21 | a year ago | 11 | gpl-3.0 | Python | |||||
SnooKey is a python script made to fetch a Reddit stream key for RPAN | ||||||||||
Redditwatcher | 17 | 5 years ago | 1 | Shell | ||||||
:radio: Reddit streaming CLI | ||||||||||
Analyzing Reddit Sentiment With Aws | 16 | 3 years ago | other | Python | ||||||
Learn how to use Kinesis Firehose, AWS Glue, S3, and Amazon Athena by streaming and analyzing reddit comments in realtime. 100-200 level tutorial. | ||||||||||
Twitchy_the_bot | 15 | 7 years ago | 15 | mit | Python | |||||
A reddit bot that gets twitch.tv streams from wiki pages and adds them to the subreddit's sidebar if they are live. | ||||||||||
Streamdeck Youtube Dl Plugin | 13 | a year ago | C++ | |||||||
An Elgato StreamDeck Plugin that calls youtube-dl.exe with whatever link is in the clipboard. It also downloads media if provided with Reddit URLs. | ||||||||||
Ace.bundle | 13 | 4 years ago | 5 | gpl-3.0 | Python | |||||
Plex plug-in that plays live streams using acestream engine |
Assertion: A large part of today's tech industry can be described as some combination of sending, transforming and consuming streams of data. A few quick examples:
The Reactive Streams standard defines an upstream demand channel and a downstream data channel. Publishers do not send data until a request for N elements arrives via the demand channel, at which point they are free to push up to N elements downstream either in batches or individually. When outstanding demand exists, the publisher is free to push data to the subscriber as it becomes available. When demand is exhausted, the publisher cannot send data except as a response to demand signalled from downstream. This lack of demand, or backpressure, propagates upstream in a controlled manner, allowing the source node to choose between starting up more resources, slowing down, or dropping data.
What's cool is that since data and demand travel in opposite directions, merging streams splits the upstream demand and splitting streams merges the downstream demand.
RS is defined by the following minimal, heavily tested, set of interfaces.
trait Publisher[T] {
def subscribe(s: Subscriber[T]): Unit
}
trait Subscriber[T] {
def onSubscribe(s: Subscription): Unit
def onNext(t: T): Unit
def onError(t: Throwable): Unit
def onComplete(): Unit
}
trait Subscription {
def request(n: Int): Unit
def cancel(): Unit
}
This is great, but it's all very low level. Look at all those Unit return types! Fortunately, there's a domain-specific language for transforming and composing stream processing graphs.
Akka Streams has two major components:
We're going to use Akka Streams to count the number of times words are used in commments on each of the most popular sub-forums on Reddit. (Recap: Reddit is structured with subreddits at the top level. Users can post links to subreddits and add comments to links. Links and comments can be voted 'up' or 'down' by users.) We're going to use Reddit's API to get a list of popular subreddits, get a list of popular links for each subreddit, and then get popular comments for each link. Finally, we'll persist word counts for the comments of each subreddit.
Let's start with an overview of the types we'll be working with.
Scala Futures:
You can skip this paragraph if you're already familiar with Scala's Future class. If not, a Future[T] is an object holding a value of type T which may become available at some point. This value is usually the result of some other computation. For example: def execute(req: HttpRequest)(implicit ec: ExecutionContext): Future[HttpResponse]
is a function that executes an HttpRequest
and, instead of blocking until a response is received, immediately returns a Future[HttpResponse]
. (The implicit ec: ExecutionContext
parameter provides a thread pool for executing callbacks on futures, which is outside the scope of this post)
Reddit API:
type WordCount = Map[String, Int]
case class LinkListing(links: Seq[Link])
case class Link(id: String, subreddit: String)
case class CommentListing(subreddit: String, comments: Seq[Comment])
case class Comment(subreddit: String, body: String)
trait RedditAPI {
def popularLinks(subreddit: String)(implicit ec: ExecutionContext): Future[LinkListing]
def popularComments(link: Link)(implicit ec: ExecutionContext): Future[CommentListing]
def popularSubreddits(implicit ec: ExecutionContext): Future[Seq[String]]
}
An instance of the type Source[Out]
produces a potentially unbounded stream of elements of type Out
. We'll start by creating a stream of subreddit names, represented as Strings.
Sources can be created from Vectors (an indexed sequence roughly equivalent to an Array).
val subreddits: Source[String] = Source(args.toVector)
Try it out:
import com.inanna-malick.Main._
import akka.stream.scaladsl._
Source(Array("funny", "sad").toVector).runForeach(println)
As expected:
funny
sad
Single-element sources can also be created from Futures, resulting in a Source that emits the result of the future if it succeeds or fails if the future fails.
val subreddits: Source[String] = Source(RedditAPI.popularSubreddits).mapConcat(identity)
Since popularSubreddits
creates a Future[Seq[String]]
, we take the additional step of using mapConcat
to flatten the resulting Source[Seq[String]]
into a Source[String]
. mapConcat
'Transforms each input element into a sequence of output elements that is then flattened into the output stream'. Since we already have a Source[Seq[T]]
, we just pass the identity function to mapConcat.
Try it out:
import akka.stream.scaladsl._
import com.inanna-malick._
import Main._
Source(RedditAPI.popularSubreddits).mapConcat(identity).runForeach(println)
This outputs:
--> started fetch popular subreddits at t0 + 608
<-- finished fetch popular subreddits after 166 millis
funny
AdviceAnimals
pics
aww
todayilearned
2 out of the top 5 subreddits are dedicated to pictures of cute animals (AdviceAnimals and aww). Insight!
A Sink[In]
consumes elements of type In
. Some sinks produce values on completion. For example, ForeachSinks produce a Future[Unit] that completes when the stream completes. FoldSinks, which fold some number of elements of type A
into an initial value of type B
using a function (A, B) => B
, produces a Future[B]
that completes when the stream completes.
This sink takes a stream of comments, converts them into (subreddit, wordcount) pairs, and merges those pairs into a Map[String, WordCount]
that can be retrieved on stream completion
val wordCountSink: FoldSink[Map[String, WordCount], Comment] =
FoldSink(Map.empty[String, WordCount])(
(acc: Map[String, WordCount], c: Comment) =>
mergeWordCounts(acc, Map(c.subreddit -> c.toWordCount))
)
This one's a bit harder to test. Instead of producing a stream of items that we can consume and print, it consumes comments and folds them together to produce a single value.
import akka.stream.scaladsl._
import com.inanna-malick._
import Main._
import scala.concurrent.Future
val comments = Vector(Comment("news", "hello world"),
Comment("news", "cruel world"),
Comment("funny", "hello world"))
val f: Future[Map[String, WordCount]] = Source(comments).runWith(wordCountSink)
f.onComplete(println)
The future completes succesfully when the Sink finishes processing the last element produced by the Source, resulting in:
Success(Map(funny -> Map(world -> 1, hello -> 1), news -> Map(world -> 2, cruel -> 1, hello -> 1)))
A Flow[In, Out]
consumes elements of type In
, applies some sequence of transformations, and emits elements of type Out
.
This Flow takes subreddit names and emits popular links for each supplied subreddit name.
Flow[String, String]
, a pipeline that applies no transformations.Flow[T, T]
that limits throughput to one message per redditAPIRate time units. val fetchLinks: Flow[String, Link] =
Flow[String]
.via(throttle(redditAPIRate))
.mapAsyncUnordered( subreddit => RedditAPI.popularLinks(subreddit) )
.mapConcat( listing => listing.links )
Let's test this out! Here we create a source using 4 subreddit names, pipe it through fetchLinks
, and use runForeach to consume and print each element emitted by the resulting Source
.
import akka.stream.scaladsl._
import com.inanna-malick.Main._
Source(Vector("funny", "sad", "politics", "news")).via(fetchLinks).runForeach(println)
This outputs:
--> started links: r/funny/top at t0 + 193
--> started links: r/sad/top at t0 + 486
<-- finished links: r/funny/top after 305 millis
Link(202wd3,funny)
(more links...)
Link(15jrds,funny)
<-- finished links: r/sad/top after 228 millis
Link(2hasrr,sad)
(more links...)
Link(w346r,sad)
--> started links: r/politics/top at t0 + 996
<-- finished links: r/politics/top after 349 millis
Link(1ryfk0,politics)
(more links...)
Link(1wxyyi,politics)
--> started links: r/news/top at t0 + 1495
<-- finished links: r/news/top after 141 millis
Link(2kp34z,news)
(more links...)
Link(2ooscv,news)
Note how about 500 milliseconds elapse between each fetch links call (193, 486, 996 and 1495 milliseconds) due to the throttle. As each call completes with a LinkListing, the pipeline emits a batch of Link objects.
This flow uses the same sequence of steps (with a different API call) to convert a stream of links into a stream of the most popular comments on those links.
val fetchComments: Flow[Link, Comment] =
Flow[Link]
.via(throttle(redditAPIRate))
.mapAsyncUnordered( link => RedditAPI.popularComments(link) )
.mapConcat( listing => listing.comments )
Let's test this flow with one of the links outputted by the previous test.
import akka.stream.scaladsl._
import com.inanna-malick._
import Main._
Source(Vector(Link("2ooscv","news"))).via(fetchComments).runForeach(println)
Source(Vector(Link("2ooscv","news")))
emits a single link that maps to this article: Illinois General Assembly passes bill to ban citizens from recording police. Piping that source through the fetchComments
flow creates a Source[Comment] that fetches and emits the top comments on that link:
--> started comments: r/news/2ooscv/comments at t0 + 615
<-- finished comments: r/news/2ooscv/comments after 6104 millis
Comment(news,Ah i am sure it will be overturned eventually. What a waste of money and time.)
Comment(news,If you can't stop police from murdering people, stop people from finding out about it. )
Comment(news,If nobody videotapes the beatings, morale will definitely improve!)
(...many more comments)
Not everything can be expressed as a linear sequence of stream processing steps. The Akka Streams DSL provides tools for building stream processing graphs with stream processing nodes that have multiple inputs and outputs. In this case, we want to zip a fast stream together with a slow one, to throttle the throughput of the fast stream to that of the the slow one.
First, here's the type signature of throttle: def throttle[T](rate: FiniteDuration): Flow[T, T]
. The flow graph creation syntax can be a bit hard to follow, so to start here's a flowchart showing the structure of the stream processing step we're going to build.
+------------+
| tickSource +-Unit-+
+------------+ +---> +-----+ +-----+ +-----+
| zip +-(T,Unit)-> | map +--T-> | out |
+----+ +---> +-----+ +-----+ +-----+
| in +----T---------+
+----+
And the code:
def throttle[T](rate: FiniteDuration): Flow[T, T] = {
val tickSource = TickSource(rate, rate, () => () )
val zip = Zip[T, Unit]
val in = UndefinedSource[T]
val out = UndefinedSink[(T, Unit)]
PartialFlowGraph{ implicit builder =>
import FlowGraphImplicits._
in ~> zip.left
tickSource ~> zip.right
zip.out ~> out
}.toFlow(in, out).map{ case (t, _) => t }
}
Let's test it out:
import akka.stream.scaladsl._
import com.inanna-malick._
import Main._
import scala.concurrent.duration._
Source((1 to 10).toVector).via(throttle[Int](500 millis)).runForeach{ n => println(s"$n @ ${System.currentTimeMillis}")}
yields:
1 @ 1421640216506
2 @ 1421640217006
3 @ 1421640217506
4 @ 1421640218006
5 @ 1421640218507
Just for fun, let's remove the throttle:
import akka.stream.scaladsl._
import com.inanna-malick.Main._
Source((1 to 1000).toVector).runForeach{ n => println(s"$n @ ${System.currentTimeMillis}")}
Without the throttle, 1000 elements are consumed within 64 ms.
1 @ 1421641414420
...
1000 @ 1421641414484
Finally, we combine these steps to create a description of a stream processing graph, which we materialize and run with .runWith().
val res: Future[Map[String, WordCount]] =
subreddits
.via(fetchLinks)
.via(fetchComments)
.runWith(wordCountSink)
res.onComplete(writeResults)