WARNING Fn Flow is no longer maintained or supported.
Fn Flow lets you build long-running, reliable and scalable functions using Fn that only consume compute resources when they have work to do and are written purely in code. Flow supports building complex parallel processes that are readable, testable (including via unit testing) using standard programming tools. Flow empowers you to build workflows as distributed programs that are as complex as you need them to be and supports a rich set of concurrency primitives including fork-join, chaining, delays and error handling.
Most workflow systems strongly separate the coordination of a process from its execution, typically using declarative workflow programming languages like state machines to execute the workflow in one system and the units of work (e.g. steps, stages) in another.
This creates several new problems that Fn Flow aims to overcome:
The following Java Fn Flow function coordinates five other functions to:
scraper
function)detect-plates
),draw
) then send an alert containing the rendered image (alert
)complete
)public void handleRequest(ScrapeReq input) throws Exception {
FlowFuture<ScrapeResp> scrapes = currentFlow().invokeFunction("./scraper", input, ScrapeResp.class);
scrapes.thenCompose(resp -> {
List<FlowFuture<?>> pendingTasks = resp.result
.stream()
.map(scrapeResult -> {
String id = scrapeResult.id;
return currentFlow()
.invokeFunction("./detect-plates",
new DetectPlateReq(scrapeResult.image_url, "us"), DetectPlateResp.class)
.thenCompose((plateResp) -> {
if (!plateResp.got_plate) {
return currentFlow().completedValue(null);
}
return currentFlow()
.invokeFunction("./draw",
new DrawReq(id, scrapeResult.image_url, plateResp.rectangles,"300x300"), DrawResp.class)
.thenCompose((drawResp) -> currentFlow()
.invokeFunction("./alert", new AlertReq(plateResp.plate, drawResp.image_url)));
});
}).collect(Collectors.toList());
return currentFlow()
.allOf(pendingTasks.toArray(new FlowFuture[pendingTasks.size()]))
.whenComplete((v, throwable) -> {
if (throwable != null) {
log.info("Scraping completed with at least one error", throwable);
} else {
currentFlow()
.invokeFunction("./complete",CompleteResult("Scraped " + pendingTasks.size() + " Images")));
}
});;
});
}
While the above program can be written and reasoned about as a single method, (and tested using a JUnit rule) it is in fact executed by braking each stage of the computation down into Fn Functions, each of which runs as an independent Fn call - for instance the results of 'detect-plates' may be processed on one or more different containers. Functions like 'detect-plates' may take a while to run and when they are running none of the surrounding code blocks are consuming resources in the platform.
Flow Functions are currently supported in Java but the platform is not language specific and new language bindings can be build by implementing the function side of the Flow API. We are working on adding support for JavaScript, Python and Go.
To find out how to use Fn Flow in Java read the user guide.
The Flow Service retains a trace of each Flow's execution and can publish the real time state of an ongoing flow. This can be used to diagnose errors in flows (in a similar way to using a stack trace in a debugger) - We have developed an experimental UI that shows this data in real time and lets you backtrack to the source of problems.
Make sure the functions server is running
$ fn start ....
time="2017-09-16T22:04:49Z" level=info msg="available memory" ram=1590210560
______
/ ____/___
/ /_ / __ \
/ __/ / / / /
/_/ /_/ /_/
time="2017-09-16T22:04:49Z" level=info msg="Serving Functions API on address `:8080`"
Set FNSERVER_IP to the IP address of the Fn Server:
FNSERVER_IP=$(docker inspect --type container -f '{{.NetworkSettings.IPAddress}}' fnserver)
Then run the Flow Service:
docker run --rm -d \
-p 8081:8081 \
-e API_URL="http://$FNSERVER_IP:8080/invoke" \
-e no_proxy=$FNSERVER_IP \
--name flowserver \
fnproject/flow:latest
Configure via the environment
Env | Default | Usage |
---|---|---|
API_URL | http://localhost:8080 | sets the FN API endpoint for outbound invocations |
DB_URL | sqlite3://./data/flow.db | DB url, you may also use "inmem:/" for in memory storage |
LISTEN | :8081 | listen host/port (overrides PORT) |
Also see our Flow UI
Please see CONTRIBUTING.md.