Project Name | Stars | Downloads | Repos Using This | Packages Using This | Most Recent Commit | Total Releases | Latest Release | Open Issues | License | Language |
---|---|---|---|---|---|---|---|---|---|---|
Killrweather | 1,174 | 6 years ago | 23 | apache-2.0 | Scala | |||||
KillrWeather is a reference application (work in progress) showing how to easily integrate streaming and batch data processing with Apache Spark Streaming, Apache Cassandra, Apache Kafka and Akka for fast, streaming computations on time series data in asynchronous event-driven environments. | ||||||||||
Utils4s | 1,033 | 4 years ago | 5 | Scala | ||||||
scala、spark使用过程中,各种测试用例以及相关资料整理 | ||||||||||
Bahir | 325 | 5 | 3 months ago | 1 | September 13, 2019 | 7 | apache-2.0 | Scala | ||
Mirror of Apache Bahir | ||||||||||
Cloudflow | 323 | 7 | a month ago | 604 | October 03, 2022 | 125 | apache-2.0 | Scala | ||
Cloudflow enables users to quickly develop, orchestrate, and operate distributed streaming applications on Kubernetes. | ||||||||||
Every Single Day I Tldr | 304 | 3 days ago | ||||||||
A daily digest of the articles or videos I've found interesting, that I want to share with you. | ||||||||||
Akka Analytics | 281 | 7 years ago | apache-2.0 | Scala | ||||||
Large-scale event processing with Akka Persistence and Apache Spark | ||||||||||
Java_learning_practice | 118 | a year ago | 3 | Java | ||||||
java 进阶之路:面试高频算法、akka、多线程、NIO、Netty、SpringBoot、Spark&&Flink 等 | ||||||||||
User Guide Smack | 66 | 6 years ago | other | Scala | ||||||
[Cloudframeworks]SMACK Big Data Architecture - user guide / [云框架]SMACK大数据架构-用户指南 | ||||||||||
Busfloatingdata | 59 | 1 | 5 years ago | 1 | September 25, 2016 | apache-2.0 | Scala | |||
Showcase for IoT Platform Blog | ||||||||||
Model Serving Tutorial | 53 | 4 years ago | apache-2.0 | Scala | ||||||
Code and presentation for Strata Model Serving tutorial |
Large-scale event processing with Akka Persistence and Apache Spark. At the moment you can
RDD
.DStream
.resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven"
libraryDependencies ++= Seq(
"com.github.krasserm" %% "akka-analytics-cassandra" % “0.3.1”,
"com.github.krasserm" %% "akka-analytics-kafka" % “0.3.1”
)
With akka-analytics-cassandra
you can expose and process events written by all persistent actors as resilient distributed dataset (RDD
). It uses the Spark Cassandra Connector to fetch data from the Cassandra journal. Here's a primitive example (details here):
import akka.actor.ActorSystem
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import akka.analytics.cassandra._
val conf = new SparkConf()
.setAppName("CassandraExample")
.setMaster("local[4]")
.set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext(conf)
// expose journaled Akka Persistence events as RDD
val rdd: RDD[(JournalKey, Any)] = sc.eventTable().cache()
// and do some processing ...
rdd.sortByKey().map(...).filter(...).collect().foreach(println)
The dataset generated by eventTable()
is of type RDD[(JournalKey, Any)]
where Any
represents the persisted event (see also Akka Persistence API) and JournalKey
is defined as
package akka.analytics.cassandra
case class JournalKey(persistenceId: String, partition: Long, sequenceNr: Long)
Events for a given persistenceId
are partitioned across nodes in the Cassandra cluster where the partition is represented by the partition
field in the key. The eventTable()
method returns an RDD
in which events with the same persistenceId
- partition
combination (= cluster partition) are ordered by increasing sequenceNr
but the ordering across cluster partitions is not defined. If needed the RDD
can be sorted with sortByKey()
by persistenceId
, partition
and sequenceNr
in that order of significance. Btw, the default size of a cluster partition in the Cassandra journal is 5000000
events (see akka-persistence-cassandra).
With akka-analytics-kafka
you can expose and process events written by all persistent actors (more specific, from any user-defined topic) as discretized stream (DStream
). Here's a primitive example (details here):
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
import akka.analytics.kafka._
import akka.persistence.kafka.Event
val sparkConf = new SparkConf()
.setAppName("events-consumer")
.setMaster("local[4]")
// read from user-defined events topic
// with 2 threads (see also Kafka API)
val topics = Map("events" -> 2)
val params = Map[String, String](
"group.id" -> "events-consumer",
"auto.commit.enable" -> "false",
"auto.offset.reset" -> "smallest",
"zookeeper.connect" -> "localhost:2181",
"zookeeper.connection.timeout.ms" -> "10000")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val es: DStream[Event] = ssc.eventStream(params, topics)
es.foreachRDD(rdd => rdd.map(...).filter(...).collect().foreach(println))
ssc.start()
ssc.awaitTermination()
The stream generated by eventStream(...)
is of type DStream[Event]
where Event
is defined in akka-persistence-kafka
as
package akka.persistence.kafka
/**
* Event published to user-defined topics.
*
* @param persistenceId Id of the persistent actor that generates event `data`.
* @param sequenceNr Sequence number of the event.
* @param data Event data generated by a persistent actor.
*/
case class Event(persistenceId: String, sequenceNr: Long, data: Any)
The stream of events (written by all persistent actors) is partially ordered i.e. events with the same persistenceId
are ordered by sequenceNr
whereas the ordering of events with different persistenceId
is not defined. Details about Kafka consumer params
are described here.
If events have been persisted with a custom serializer, the corresponding Akka serializer configuration must be specified for event processing. For event batch processing this is done as follows:
val system: ActorSystem = ...
val jsc: JournalSparkContext =
new SparkContext(sparkConfig).withSerializerConfig(system.settings.config)
val rdd: RDD[(JournalKey, Any)] = jsc.eventTable()
// ...
jsc.context.stop()
For event stream processing this is done in a similar way:
val system: ActorSystem = ...
val jsc: JournalStreamingContext =
new StreamingContext(sparkConfig, Seconds(1)).withSerializerConfig(system.settings.config)
val es: DStream[Event] = jsc.eventStream(kafkaParams, kafkaTopics)
// ...
jsc.context.start()
// ...
jsc.context.stop()
Running examples are akka.analytics.cassandra.CustomSerializationSpec
and akka.analytics.kafka.CustomSerializationSpec
.