Assembler

Functional, type-safe, stateless reactive Java API for efficient implementation of the API Composition Pattern for querying/merging data from multiple datasources/services, with a specific focus on solving the N + 1 query problem
Alternatives To Assembler
Project NameStarsDownloadsRepos Using ThisPackages Using ThisMost Recent CommitTotal ReleasesLatest ReleaseOpen IssuesLicenseLanguage
Armeria4,269107819 hours ago205September 13, 2022560apache-2.0Java
Your go-to microservice framework for any situation, from the creator of Netty et al. You can build any type of microservice leveraging your favorite technologies, including gRPC, Thrift, Kotlin, Retrofit, Reactive Streams, Spring Boot and Dropwizard.
Lagom2,59615a year ago52December 05, 2019452apache-2.0Scala
Reactive Microservices for the JVM
Spring Cloud Stream901706362 days ago56May 26, 2022111apache-2.0Java
Framework for building Event-Driven Microservices
Jdonframework877
153 years ago5September 22, 20183apache-2.0Java
Domain-Driven-Design Pub/Sub Domain-Events framework
Servicetalk836492 days ago54June 10, 2022100apache-2.0Java
A networking framework that evolves with your application
Qbit6878165 years ago83October 20, 201762apache-2.0Java
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Spring Lemon654
2 months ago3July 01, 202211otherJava
Helper library for Spring Boot web applications
Scalecube Services5791585 days ago158October 29, 202121apache-2.0Java
a microservices library - scalecube-services is a high throughput, low latency reactive microservices library built to scale. it features: API-Gateways, service-discovery, service-load-balancing, the architecture supports plug-and-play service communication modules and features. built to provide performance and low-latency real-time stream-processing. its open and designed to accommodate changes. (no sidecar in a form of broker or any kind)
Reactive Interaction Gateway553
5 months ago1November 27, 201752apache-2.0Elixir
Create low-latency, interactive user experiences for stateless microservices.
Studio4951262 years ago63April 25, 20175mitJavaScript
A nodejs framework to create decoupled and scalable applications
Alternatives To Assembler
Select To Compare


Alternative Project Comparisons
Readme

Assembler

Maven Central Javadocs Twitter Follow

The Assembler Library is a reactive, functional, type-safe, and stateless Java API that enables efficient implementation of the API Composition Pattern for querying and merging data from multiple data sources/services. This library is also designed to solve the N + 1 query problem and is architecture-agnostic, allowing it to be used as part of a monolithic or microservice architecture.

Internally, the library leverages Project Reactor to implement end-to-end reactive stream pipelines and maintain all the reactive stream properties as defined by the Reactive Manifesto, including responsiveness, resilience, elasticity, message-driven with back-pressure, non-blocking, and more.

Assembler

Table of Contents

Use Cases

The Assembler library can be used in situations where an application needs to access data or functionality that is spread across multiple services. Some common use cases include:

  1. CQRS/Event Sourcing: The Assembler library can be used on the read side of a CQRS and Event Sourcing architecture to efficiently build materialized views that aggregate data from multiple sources.
  2. API Gateway: The Assembler library can be used in conjunction with an API Gateway, which acts as a single entry point for all client requests. The API Gateway can combine multiple APIs into a single, unified API, simplifying the client's interactions with the APIs and providing a unified interface for the client to use.
  3. Backends for Frontends: The Assembler library can also be used in conjunction with Backends for Frontends (BFFs). A BFF is a dedicated backend service that provides a simplified and optimized API specifically tailored for a particular client or group of clients.
  4. Reduce network overhead: By combining multiple APIs into a single API, the Assembler library can reduce the amount of network traffic required for a client to complete a task. This can improve the performance of the client application and reduce the load on the server.
  5. Solve the N + 1 Query Problem: The Assembler library can solve the N + 1 query problem by allowing a client to make a single request to a unified API that includes all the necessary data. This approach reduces the number of requests required and database queries, further optimizing the application's performance.

⬆️

Basic Usage

Here is an example of how to use the Assembler Library to generate transaction information from a list of customers of an online store. This example assumes the following fictional data model and API to access different services:

public record Customer(Long customerId, String name) {}

public record BillingInfo(Long id, Long customerId, String creditCardNumber) {
    
  public BillingInfo(Long customerId) {
    this(null, customerId, "0000 0000 0000 0000");
  }
}

public record OrderItem(String id, Long customerId, String orderDescription, Double price) {}

public record Transaction(Customer customer, BillingInfo billingInfo, List<OrderItem> orderItems) {}
Flux<Customer> getCustomers(); // e.g. call to a microservice or a Flux connected to a Kafka source
Flux<BillingInfo> getBillingInfo(List<Long> customerIds); // e.g. connects to relational database (R2DBC)
Flux<OrderItem> getAllOrders(List<Long> customerIds); // e.g. connects to MongoDB

In cases where the getCustomers() method returns a substantial number of customers, retrieving the associated BillingInfo for each customer would require an additional call per customerId. This would result in a considerable increase in network calls, causing the N + 1 queries issue. To mitigate this, we can retrieve all the BillingInfo for all the customers returned by getCustomers() with a single additional call. The same approach can be used for retrieving OrderItem information.

As we are working with three distinct and independent data sources, the process of joining data from Customer, BillingInfo, and OrderItem into a Transaction must be performed at the application level. This is the primary objective of this library.

When utilizing the Assembler Library, the aggregation of multiple reactive data sources and the implementation of the API Composition Pattern can be accomplished as follows:

import reactor.core.publisher.Flux;
import io.github.pellse.reactive.assembler.Assembler;
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Rule.rule;
    
Assembler<Customer, Flux<Transaction>> assembler = assemblerOf(Transaction.class)
  .withCorrelationIdExtractor(Customer::customerId)
  .withAssemblerRules(
    rule(BillingInfo::customerId, oneToOne(this::getBillingInfo)),
    rule(OrderItem::customerId, oneToMany(OrderItem::id, this::getAllOrders)),
    Transaction::new)
  .build();

Flux<Transaction> transactionFlux = assembler.assemble(getCustomers());

The code snippet above demonstrates the process of first retrieving all customers, followed by the concurrent retrieval of all billing information and orders (in a single query) associated with the previously retrieved customers, as defined by the assembler rules. The final step involves aggregating each customer, their respective billing information, and list of order items (related by the same customer id) into a Transaction object. This results in a reactive stream (Flux) of Transaction objects.

⬆️

Default values for missing data

To provide a default value for each missing values from the result of the API call, a factory function can also be supplied as a 2nd parameter to the oneToOne() function. For example, when getCustomers() returns 3 Customer [C1, C2, C3], and getBillingInfo([ID1, ID2, ID3]) returns only 2 associated BillingInfo [B1, B2], the missing value B3 can be generated as a default value. By doing so, a null BillingInfo is never passed to the Transaction constructor:

rule(BillingInfo::customerId, oneToOne(this::getBillingInfo, customerId -> new BillingInfo(customerId)))

or more concisely:

rule(BillingInfo::customerId, oneToOne(this::getBillingInfo, BillingInfo::new))

Unlike the oneToOne() function, oneToMany() will always default to generating an empty collection. Therefore, providing a default factory function is not needed. In the example above, an empty List<OrderItem> is passed to the Transaction constructor if getAllOrders([1, 2, 3]) returns null.

⬆️

Infinite Stream of Data

In situations where an infinite or very large stream of data is being handled, such as dealing with 100,000+ customers, the Assembler Library needs to completely drain the upstream from getCustomers() to gather all correlation IDs (customerId). This can lead to resource exhaustion if not handled correctly. To mitigate this issue, the stream can be split into multiple smaller streams and processed in batches. Most reactive libraries already support this concept. Below is an example of this approach, utilizing Project Reactor:

Flux<Transaction> transactionFlux = getCustomers()
  .windowTimeout(100, ofSeconds(5))
  .flatMapSequential(assembler::assemble);

⬆️

Reactive Caching

Apart from offering convenient helper functions to define mapping semantics such as oneToOne() and oneToMany(), the Assembler library also includes a caching/memoization mechanism for the downstream subqueries via the cached() wrapper function:

import io.github.pellse.reactive.assembler.Assembler;
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Rule.rule;
import static io.github.pellse.reactive.assembler.caching.CacheFactory.cached;
    
var assembler = assemblerOf(Transaction.class)
  .withCorrelationIdExtractor(Customer::customerId)
  .withAssemblerRules(
    rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo))),
    rule(OrderItem::customerId, oneToMany(OrderItem::id, cached(this::getAllOrders))),
    Transaction::new)
  .build();
    
var transactionFlux = getCustomers()
  .window(3)
  .flatMapSequential(assembler::assemble);

⬆️

Pluggable Reactive Caching Strategies

The cached() function includes overloaded versions that enable users to utilize different Cache implementations. By providing an additional parameter of type CacheFactory to the cached() method, users can customize the caching mechanism as per their requirements. In case no CacheFactory parameter is passed to cached(), the default implementation will internally use a Cache based on HashMap.

All Cache implementations are internally decorated with non-blocking concurrency controls, making them safe for concurrent access and modifications.

Here is an example of a different approach that users can use to explicitly customize the caching mechanism e.g. storing cache entries in a TreeMap:

import io.github.pellse.reactive.assembler.Assembler;
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Rule.rule;
import static io.github.pellse.reactive.assembler.caching.CacheFactory.cache;
import static io.github.pellse.reactive.assembler.caching.CacheFactory.cached;
    
var assembler = assemblerOf(Transaction.class)
  .withCorrelationIdExtractor(Customer::customerId)
  .withAssemblerRules(
    rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo, cache(TreeMap::new)))),
    rule(OrderItem::customerId, oneToMany(OrderItem::id, cached(this::getAllOrders, cache(TreeMap::new)))),
    Transaction::new)
  .build();

⬆️

Third Party Reactive Cache Provider Integration

Below is a compilation of supplementary modules that are available for integration with third-party caching libraries. Additional modules will be incorporated in the future:

Assembler add-on module Third party cache library
Maven Central Caffeine

Here is a sample implementation of CacheFactory that showcases the use of the Caffeine library, which can be accomplished via the caffeineCache() helper method. This helper method is provided as part of the caffeine add-on module:

import com.github.benmanes.caffeine.cache.Caffeine;
import static com.github.benmanes.caffeine.cache.Caffeine.newBuilder;

import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Rule.rule;
import static io.github.pellse.reactive.assembler.CacheFactory.cached;
import static io.github.pellse.reactive.assembler.cache.caffeine.CaffeineCacheFactory.caffeineCache;
import static java.time.Duration.ofMinutes;

Caffeine<Object, Object> cacheBuilder = newBuilder()
  .recordStats()
  .expireAfterWrite(ofMinutes(10))
  .maximumSize(1000);

var assembler = assemblerOf(Transaction.class)
  .withCorrelationIdExtractor(Customer::customerId)
  .withAssemblerRules(
    rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo, caffeineCache(cacheBuilder)))),
    rule(OrderItem::customerId, oneToMany(OrderItem::id, cached(this::getAllOrders, caffeineCache()))),
    Transaction::new)
  .build();

⬆️

Auto Caching

In addition to the cache mechanism provided by the cached() function, the Assembler Library also provides a mechanism to automatically and asynchronously update the cache in real-time as new data becomes available via the autoCache() function. This ensures that the cache is always up-to-date and avoids in most cases the need for cached() to fall back to fetch missing data.

The auto caching mechanism in the Assembler Library can be seen as being conceptually similar to a KTable in Kafka. Both mechanisms provide a way to keep a key-value store updated in real-time with the latest value per key from its associated data stream. However, the Assembler Library is not limited to just Kafka data sources and can work with any data source that can be consumed in a reactive stream.

This is how autoCache() connects to a data stream and automatically and asynchronously update the cache in real-time:

import reactor.core.publisher.Flux;
import io.github.pellse.reactive.assembler.Assembler;
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Rule.rule;
import static io.github.pellse.reactive.assembler.caching.CacheFactory.cached;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactory;

Flux<BillingInfo> billingInfoFlux = ... // From e.g. Debezium/Kafka, RabbitMQ, etc.;
Flux<OrderItem> orderItemFlux = ... // From e.g. Debezium/Kafka, RabbitMQ, etc.;

var assembler = assemblerOf(Transaction.class)
  .withCorrelationIdExtractor(Customer::customerId)
  .withAssemblerRules(
    rule(BillingInfo::customerId,
      oneToOne(cached(this::getBillingInfo, caffeineCache(), autoCache(billingInfoFlux)))),
    rule(OrderItem::customerId,
      oneToMany(OrderItem::id, cached(this::getAllOrders, autoCache(orderItemFlux)))),
    Transaction::new)
  .build();
    
var transactionFlux = getCustomers()
  .window(3)
  .flatMapSequential(assembler::assemble);

It is also possible to customize the Auto Caching configuration via autoCacheBuilder():

import reactor.core.publisher.Flux;
import io.github.pellse.reactive.assembler.Assembler;
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Rule.rule;
import static io.github.pellse.reactive.assembler.caching.CacheFactory.cached;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactoryBuilder.autoCacheBuilder;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactory.OnErrorMap.onErrorMap;
import static reactor.core.scheduler.Schedulers.newParallel;
import static java.time.Duration.*;
import static java.lang.System.Logger.Level.WARNING;
import static java.lang.System.getLogger;

var logger = getLogger("auto-cache-logger");

Flux<BillingInfo> billingInfoFlux = ... // From e.g. Debezium/Kafka, RabbitMQ, etc.;
Flux<OrderItem> orderItemFlux = ... // From e.g. Debezium/Kafka, RabbitMQ, etc.;

var assembler = assemblerOf(Transaction.class)
  .withCorrelationIdExtractor(Customer::customerId)
  .withAssemblerRules(
    rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo,
      autoCacheBuilder(billingInfoFlux)
        .maxWindowSizeAndTime(100, ofSeconds(5))
        .errorHandler(error -> logger.log(WARNING, "Error in autoCache", error))
        .scheduler(newParallel("billing-info"))
        .maxRetryStrategy(50)
        .build()))),
    rule(OrderItem::customerId, oneToMany(OrderItem::id, cached(this::getAllOrders,
      autoCacheBuilder(orderItemFlux)
        .maxWindowSize(50)
        .errorHandler(onErrorMap(MyException::new))
        .scheduler(newParallel("order-item"))
        .backoffRetryStrategy(100, ofMillis(10))
        .build()))),
    Transaction::new)
  .build();
    
var transactionFlux = getCustomers()
  .window(3)
  .flatMapSequential(assembler::assemble);

By default, the cache is updated for every element from the incoming stream of data, but it can be configured to batch the cache updates, useful when we are updating a remote cache to optimize network calls

⬆️

Event Based Auto Caching

Assuming the following custom domain events not known by the Assembler Library:

sealed interface MyEvent<T> {
  T item();
}

record ItemUpdated<T>(T item) implements MyEvent<T> {}
record ItemDeleted<T>(T item) implements MyEvent<T> {}

record MyOtherEvent<T>(T value, boolean isAddOrUpdateEvent) {}

// E.g. Flux coming from a Change Data Capture/Kafka source
Flux<MyOtherEvent<BillingInfo>> billingInfoFlux = Flux.just(
  new MyOtherEvent<>(billingInfo1, true), new MyOtherEvent<>(billingInfo2, true),
  new MyOtherEvent<>(billingInfo2, false), new MyOtherEvent<>(billingInfo3, false));

// E.g. Flux coming from a Change Data Capture/Kafka source
Flux<MyEvent<OrderItem>> orderItemFlux = Flux.just(
  new ItemUpdated<>(orderItem11), new ItemUpdated<>(orderItem12), new ItemUpdated<>(orderItem13),
  new ItemDeleted<>(orderItem31), new ItemDeleted<>(orderItem32), new ItemDeleted<>(orderItem33));

Here is how autoCache() can be used to adapt those custom domain events to add, update or delete entries from the cache in real-time:

import io.github.pellse.reactive.assembler.Assembler;
import io.github.pellse.reactive.assembler.caching.CacheFactory.CacheTransformer;
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Rule.rule;
import static io.github.pellse.reactive.assembler.caching.CacheFactory.cached;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactory.autoCache;
    
CacheTransformer<Long, BillingInfo, BillingInfo> billingInfoAutoCache =
  autoCache(billingInfoFlux, MyOtherEvent::isAddOrUpdateEvent, MyOtherEvent::value);

CacheTransformer<Long, OrderItem, List<OrderItem>> orderItemAutoCache =
  autoCache(orderItemFlux, ItemUpdated.class::isInstance, MyEvent::item);

Assembler<Customer, Flux<Transaction>> assembler = assemblerOf(Transaction.class)
  .withCorrelationIdExtractor(Customer::customerId)
  .withAssemblerRules(
    rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo, billingInfoAutoCache))),
    rule(OrderItem::customerId, oneToMany(OrderItem::id, cached(this::getAllOrders, orderItemAutoCache))),
    Transaction::new)
  .build();

var transactionFlux = getCustomers()
  .window(3)
  .flatMapSequential(assembler::assemble);

⬆️

Integration with non-reactive sources

A utility function toPublisher() is also provided to wrap non-reactive sources, useful when e.g. calling 3rd party synchronous APIs:

import reactor.core.publisher.Flux;
import io.github.pellse.reactive.assembler.Assembler;
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Rule.rule;
import static io.github.pellse.reactive.assembler.QueryUtils.toPublisher;

List<BillingInfo> getBillingInfo(List<Long> customerIds); // non-reactive source
List<OrderItem> getAllOrders(List<Long> customerIds); // non-reactive source

Assembler<Customer, Flux<Transaction>> assembler = assemblerOf(Transaction.class)
  .withCorrelationIdExtractor(Customer::customerId)
  .withAssemblerRules(
    rule(BillingInfo::customerId, oneToOne(toPublisher(this::getBillingInfo))),
    rule(OrderItem::customerId, oneToMany(OrderItem::id, toPublisher(this::getAllOrders))),
    Transaction::new)
  .build();

⬆️

Kotlin Support

Maven Central

sealed interface MyEvent<T> {
  val item: T
}

data class ItemUpdated<T>(override val item: T) : MyEvent<T>
data class ItemDeleted<T>(override val item: T) : MyEvent<T>

// E.g. Flux coming from a Change Data Capture/Kafka source
val billingInfoFlux: Flux<MyEvent<BillingInfo>> = Flux.just(
  ItemUpdated(billingInfo1), ItemUpdated(billingInfo2),
  ItemUpdated(billingInfo3), ItemDeleted(billingInfo3))

// E.g. Flux coming from a Change Data Capture/Kafka source
val orderItemFlux: Flux<MyEvent<OrderItem>> = Flux.just(
  ItemUpdated(orderItem31), ItemUpdated(orderItem32), ItemUpdated(orderItem33),
  ItemDeleted(orderItem31), ItemDeleted(orderItem32), ItemDeleted(orderItem33))
import io.github.pellse.reactive.assembler.kotlin.assembler
import io.github.pellse.reactive.assembler.kotlin.cached
import io.github.pellse.reactive.assembler.RuleMapper.oneToMany
import io.github.pellse.reactive.assembler.RuleMapper.oneToOne
import io.github.pellse.reactive.assembler.Rule.rule
import io.github.pellse.reactive.assembler.caching.CacheFactory.cache
import io.github.pellse.reactive.assembler.caching.AutoCacheFactory.autoCache
import io.github.pellse.reactive.assembler.caching.AutoCacheFactoryBuilder.autoCacheBuilder

val assembler = assembler<Transaction>()
  .withCorrelationIdExtractor(Customer::customerId)
  .withAssemblerRules(
    rule(BillingInfo::customerId, oneToOne(::getBillingInfo.cached(cache(::sortedMapOf),
      autoCache(billingInfoFlux, ItemUpdated::class::isInstance) { it.item }))),
    rule(OrderItem::customerId, oneToMany(OrderItem::id, ::getAllOrders.cached(
      autoCacheBuilder(orderItemFlux, ItemUpdated::class::isInstance, MyEvent<OrderItem>::item)
        .maxWindowSize(3)
        .build()))), 
    ::Transaction)
  .build()

⬆️

What's Next?

See the list of issues for planned improvements in a near future.

⬆️

Popular Reactive Projects
Popular Microservices Projects
Popular Control Flow Categories
Related Searches

Get A Weekly Email With Trending Projects For These Categories
No Spam. Unsubscribe easily at any time.
Java
Kotlin
Microservices
Reactive
Cqrs
Event Sourcing
Reactive Programming
Event Driven
Reactive Streams