Skip to main content

from

L'exécution d'un stream démarre avec la clause from()

Lorsqu'un stream est composé de plusieurs flux, chaque clause from() doit être exécutée sur une URL distincte

Patterns

Connecteur: données entrantes / sortantes

Usage

interface org.mddynamics.network.stream.LinkStream
    public LinkStream from(String label, String url, [(LinkStream stream, Event event) -> { <lambda> }],..) throws StreamDefinitionException;

@param label: libellé de la lambda

@param url: URL de distribution de la lambda

@param stream: instance du stream

@param event: évenement ou message reçu par la lambda

@function lambda: implémentation de la fonction lambda. Doit retouner un objet sérialisable comme input pour les traitements suivants de l'algorithme. Si la lambda retourne null alors les clauses suivantes ne sont pas exécutées.

Lorsque plusieurs fonctions lambda sont définies, elles sont exécutées séquentiellement sur les mêmes données reçues en entrée et tous les résultats sont transmis vers la clause suivante de l'algorithme (à la façon d'une fonction "JOIN")

caution

Si la lambda retourn un itérable (implémente l'interface java.lang.Iterable) alors chaque élément est transmis en streaming (au fil de l'eau); dans le cas contraire, l'objet entier est transmis vers la clause suivante.

Exemple de code


@Override
public void build() throws Exception {
// Flux synchrone (Request/reply): Exposition d'une api REST
make("power moniroting api")
.from("http", "www.mddynamics.fr/app/api", (stream, request) -> {
// transmet la requête après validation
return valid(request);
})
.when("400", (stream, request)-> request.matches("status", "ERROR"))
.reply(...)

.oterwise("cas nominal")
.read("data", "www.mddynamics.fr/app/datastore...", (stream, request) -> {})
.reply(...);

// Flux asynchrone (Publish/subscribe): Collecte des données de production
make("power data collect")
.from("collect", "www.mddynamics.fr/app/data", (stream, data) -> {
// transmet chaque métrique à distribuer sur le réseau de datastores
return data.resource().list("metrics");
})
.store("metrics", "www.mddynamics.fr/app/data?mddynamics.id=/name", (stream, metric) -> ...);
}