You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: docs/src/content/docs/flow-control/reducer.md
+89-11
Original file line number
Diff line number
Diff line change
@@ -24,7 +24,7 @@ The `Reducer` middleware is an essential flow control mechanism of Project Lakec
24
24
25
25
At its core, this middleware allows pipeline builders to group relevant documents into a single semantical envelope, and perform combined operations on them. In combination with other middlewares this unlocks many use-cases, for example, aggregating multiple audio files together to concatenate them, zip a collection of documents on-the-fly, or insert a set of subtitles into a video.
26
26
27
-
The `Reducer` middleware can aggregate multiple documents based on specific strategies which we will document below.
27
+
> The `Reducer` middleware can aggregate multiple documents based on specific strategies which we document below.
28
28
29
29
<br />
30
30
@@ -50,7 +50,7 @@ We say that a new pipeline execution is triggered when a new video is uploaded t
50
50
51
51
#### Time Windows
52
52
53
-
The time window strategy makes it possible to reduce events belonging to the same `chainId` within a specific time window. It defines a static time window, comprised between 1 second and 48 hours, in which all events belonging to the same `chainId` are aggregated together. When the time window reaches its end, the aggregated events are reduced into a single composite event, and forwarded to the next middlewares in the pipeline.
53
+
The time window strategy reduces events belonging to the same `chainId` within a user defined time window. The time window can be comprised between 1 second and 48 hours. When the time window reaches its end, the aggregated events are reduced into a single composite event, and forwarded to the next middlewares in the pipeline.
54
54
55
55
This strategy is a good fit for scenarios where you don't necessarily know how many documents will be produced by previous middlewares preceding the `Reducer` step.
56
56
@@ -60,7 +60,7 @@ It starts aggregating documents belonging to the same `chainId` when the first d
60
60
61
61
##### Jitter
62
62
63
-
The `TimeWindowStrategy`also allows you to optionally specify a specific jitter which consists of a random number between zero and the specified jitter value. Using a jitter can be very useful to smoothen the aggregation process across multiple `chainId`.
63
+
The `TimeWindowStrategy` allows you to optionally specify a jitter which consists of a random number between zero and an arbitrary value. Using a jitter can be useful to smoothen the aggregation process across multiple `chainId`.
64
64
65
65
For example, if your time window is 10 minutes, and you add a jitter of 30 seconds, each reduce operation will occur after 10 minutes + a random value comprised between zero and 30 seconds.
66
66
@@ -101,15 +101,17 @@ class Stack extends cdk.Stack {
101
101
102
102
#### Static Counter
103
103
104
-
The static counter strategy allows you to reduce all events belonging to the same `chainId`, based on a static counter. It allows you to specify the number of documents to aggregate together before reducing them into a single event.
104
+
The static counter strategy reduces all events belonging to the same `chainId`, based on a static counter. It allows you to specify the number of documents to aggregate together before reducing them into a single event.
105
105
106
106
This strategy is a good fit when you know the exact number of documents that you expect to be reduced.
107
107
108
-
For example, let's say that you want to translate a document in french, english, and spanish using the [Translate Text Processor](/project-lakechain/text-processing/translate-text-processor), and reduce the translated documents back together to zip them. In this case, you know that you will be expecting 3 documents associated with the 3 translated languages.
108
+
For example, let's say that you want to translate a document in french, english, and spanish using the [Translate Text Processor](/project-lakechain/text-processing/translate-text-processor), and reduce the translated documents back together to zip them. In this case, you know that you will be expecting exaxtly 3 documents associated with the translated languages.
109
109
110
110
##### Unmatched Events
111
111
112
-
As the reducer awaits for the static count condition to be met, it will aggregate documents for a period of 48 hours. If the condition is unmet after this period, the aggregated documents will be dismissed, and no event will be created.
112
+
As the reducer awaits for the static count condition to be met, it will aggregate documents for a period of up to 48 hours. If the counter is not reached after this period, the aggregated documents will be dismissed.
113
+
114
+
Similarly, if a reduce operation already occurred for a given chain identifier, any subsequent document that may arrive after the count condition has been met will be dismissed.
113
115
114
116
##### Usage
115
117
@@ -137,7 +139,76 @@ class Stack extends cdk.Stack {
137
139
}
138
140
```
139
141
140
-
<br>
142
+
<br />
143
+
144
+
---
145
+
146
+
#### Conditional Strategy
147
+
148
+
The conditional strategy reduces events based on a custom user-provided condition. It allows you to define a [funclet](/project-lakechain/guides/funclets) or a lambda function that gets called back when a new document belonging to a given `chainId` is being aggregated. This conditional expression defines when the aggregated events should be reduced.
149
+
150
+
This strategy is a good fit when you want to control the reduce process based on a specific condition. For example, let's say that you want to reduce a collection of events based on the metadata of the documents, or even based on a third-party API, you can use the conditional strategy to do that.
151
+
152
+
##### Unmatched Events
153
+
154
+
This strategy allows you to evaluate each aggregated document for a duration of up to 48 hours. If the condition is unmet after this period, the aggregated documents will be dismissed.
155
+
156
+
If a reduce operation already occurred for a given chain identifier, any subsequent document that may arrive after the condition has been met, and having the same chain identifier, will be dismissed.
157
+
158
+
##### Usage
159
+
160
+
To reduce events using the `ConditionalStrategy`, you must import and instantiate the `Reducer` middleware as part of your pipeline.
161
+
162
+
> 💁 Below is an example showcasing how to instantiate the reducer using the `ConditionalStrategy` with a custom condition.
Funclet expressions use the power of a full programming language to express complex reduce conditional expressions. They are asynchronous and can be defined as TypeScript named functions, anonymous functions, or arrow functions.
204
+
205
+
A reduce conditional funclet takes 2 arguments. A CloudEvent describing the document that is being handled by the reducer, and a collection of the stored events up until now — excluding the received event. It must return a promise to a boolean value representing the result of the evaluation, true if the reduce operation should occur, false otherwise.
@@ -147,7 +218,7 @@ The architecture implemented by this middleware depends on the selected strategy
147
218
148
219
#### `TimeWindowStrategy`
149
220
150
-
This strategy implements a serverless aggregation architecture based on DynamoDB for document event aggregation, and the [EventBridge Scheduler](https://docs.aws.amazon.com/scheduler/latest/UserGuide/what-is-scheduler.html) service for scheduling the execution of the reducer for each `chainId` group of events.
221
+
This strategy implements a serverless aggregation architecture based on DynamoDB for document event aggregation, and the [EventBridge Scheduler](https://docs.aws.amazon.com/scheduler/latest/UserGuide/what-is-scheduler.html) service for scheduling the execution of the reducer for each `chainId` group of events when the time window is reached.
The conditional strategy implements a serverless aggregation architecture based on DynamoDB as the document aggregator, and leverages an event-driven approach to evaluate a conditional expression for each received document belonging to the same `chainId`.
@@ -183,11 +260,12 @@ This strategy also implements a serverless aggregation architecture based on Dyn
183
260
| ----- | ----------- |
184
261
|`CPU`| This middleware only supports CPU compute. |
185
262
186
-
<br>
263
+
<br />
187
264
188
265
---
189
266
190
267
### 📖 Examples
191
268
192
269
-[Building a Generative Podcast](https://github.com/awslabs/project-lakechain/tree/main/examples/end-to-end-use-cases/building-a-podcast-generator) - Builds a pipeline for creating a generative weekly AWS news podcast.
193
270
-[Building a Video Chaptering Service](https://github.com/awslabs/project-lakechain/tree/main/examples/end-to-end-use-cases/building-a-video-chaptering-service) - Builds a pipeline for automatic video chaptering generation.
271
+
-[Bedrock Translation Pipeline](https://github.com/awslabs/project-lakechain/tree/main/examples/simple-pipelines/text-translation-pipelines/bedrock-translation-pipeline) - Translates documents using a large-language model hosted on Amazon Bedrock.
Copy file name to clipboardExpand all lines: docs/src/content/docs/text-processing/translate-text-processor.md
+1-1
Original file line number
Diff line number
Diff line change
@@ -171,4 +171,4 @@ When using asynchronous translations, This middleware uses an event-driven archi
171
171
172
172
### 📖 Examples
173
173
174
-
-[Text Translation Pipeline](https://github.com/awslabs/project-lakechain/tree/main/examples/simple-pipelines/text-translation-pipeline/) - An example showcasing how to translate documents using Amazon Translate.
174
+
-[Text Translation Pipeline](https://github.com/awslabs/project-lakechain/tree/main/examples/simple-pipelines/text-translation-pipelines/translate-pipeline/) - An example showcasing how to translate documents using Amazon Translate.
Copy file name to clipboardExpand all lines: examples/simple-pipelines/archive-processing-pipelines/deflate-pipeline/README.md
+2-2
Original file line number
Diff line number
Diff line change
@@ -31,12 +31,12 @@ The following requirements are needed to deploy the infrastructure associated wi
31
31
- You need access to a development AWS account.
32
32
-[AWS CDK](https://docs.aws.amazon.com/cdk/latest/guide/getting_started.html#getting_started_install) is required to deploy the infrastructure.
33
33
-[Docker](https://docs.docker.com/get-docker/) is required to be running to build middlewares.
34
-
-[Node.js](https://nodejs.org/en/download/)v18+ and NPM.
34
+
-[Node.js](https://nodejs.org/en/download/)v20+ and NPM.
35
35
-[Python](https://www.python.org/downloads/) v3.8+ and [Pip](https://pip.pypa.io/en/stable/installation/).
36
36
37
37
## 🚀 Deploy
38
38
39
-
Head to the directory [`examples/simple-pipelines/archive-processing/deflate-pipeline`](/examples/simple-pipelines/archive-processing/deflate-pipeline) in the repository and run the following commands to build the example:
39
+
Head to the directory [`examples/simple-pipelines/archive-processing/deflate-pipeline`](/examples/simple-pipelines/archive-processing-pipelines/deflate-pipeline) in the repository and run the following commands to build the example:
Copy file name to clipboardExpand all lines: examples/simple-pipelines/archive-processing-pipelines/inflate-pipeline/README.md
+2-2
Original file line number
Diff line number
Diff line change
@@ -23,12 +23,12 @@ The following requirements are needed to deploy the infrastructure associated wi
23
23
- You need access to a development AWS account.
24
24
-[AWS CDK](https://docs.aws.amazon.com/cdk/latest/guide/getting_started.html#getting_started_install) is required to deploy the infrastructure.
25
25
-[Docker](https://docs.docker.com/get-docker/) is required to be running to build middlewares.
26
-
-[Node.js](https://nodejs.org/en/download/)v18+ and NPM.
26
+
-[Node.js](https://nodejs.org/en/download/)v20+ and NPM.
27
27
-[Python](https://www.python.org/downloads/) v3.8+ and [Pip](https://pip.pypa.io/en/stable/installation/).
28
28
29
29
## 🚀 Deploy
30
30
31
-
Head to the directory [`examples/simple-pipelines/inflate-pipeline`](/examples/simple-pipelines/inflate-pipeline) in the repository and run the following commands to build the example:
31
+
Head to the directory [`examples/simple-pipelines/archive-processing-pipelines/inflate-pipeline`](/examples/simple-pipelines/archive-processing-pipelines/inflate-pipeline) in the repository and run the following commands to build the example:
Copy file name to clipboardExpand all lines: examples/simple-pipelines/data-extraction-pipelines/metadata-extraction-pipeline/README.md
+2-2
Original file line number
Diff line number
Diff line change
@@ -23,12 +23,12 @@ The following requirements are needed to deploy the infrastructure associated wi
23
23
- You need access to a development AWS account.
24
24
-[AWS CDK](https://docs.aws.amazon.com/cdk/latest/guide/getting_started.html#getting_started_install) is required to deploy the infrastructure.
25
25
-[Docker](https://docs.docker.com/get-docker/) is required to be running to build middlewares.
26
-
-[Node.js](https://nodejs.org/en/download/)v18+ and NPM.
26
+
-[Node.js](https://nodejs.org/en/download/)v20+ and NPM.
27
27
-[Python](https://www.python.org/downloads/) v3.8+ and [Pip](https://pip.pypa.io/en/stable/installation/).
28
28
29
29
## 🚀 Deploy
30
30
31
-
Head to the directory [`examples/simple-pipelines/metadata-extraction-pipeline`](/examples/simple-pipelines/metadata-extraction-pipeline) in the repository and run the following commands to build the example:
31
+
Head to the directory [`examples/simple-pipelines/data-extraction-pipelines/metadata-extraction-pipeline`](/examples/simple-pipelines/data-extraction-pipelines/metadata-extraction-pipeline) in the repository and run the following commands to build the example:
0 commit comments