Skip to content

Commit 83d5ba0

Browse files
committed
Add Project Reactor extension
1 parent 6906a9a commit 83d5ba0

File tree

21 files changed

+1552
-22
lines changed

21 files changed

+1552
-22
lines changed

MIGRATING.md

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
# Migration Guide
2+
3+
The Couchbase Analytics Java SDK is designed to work specifically with Couchbase Enterprise Analytics.
4+
It is a successor to the analytics API from the general Couchbase Java SDK, which we'll refer to here as the "operational" SDK.
5+
6+
This section offers advice on how to migrate code from the operational SDK ("before") to the analytics SDK ("after").
7+
8+
## Class names
9+
10+
The analytics SDK omits the "Analytics" prefix from several class names in favor of the more general "Query" prefix.
11+
12+
| Before | After |
13+
|---------------------|-------------------------------|
14+
| `AnalyticsResult` | `QueryResult` |
15+
| `AnalyticsOptions` | `QueryOptions` |
16+
| `AnalyticsMetaData` | `QueryMetadata` (lowercase d) |
17+
| `AnalyticsWarning` | `QueryWarning` |
18+
19+
The operational SDK's reactive API does not have a direct analogue in the base analytics SDK.
20+
An API for integrating with Project Reactor is available as an extension library.
21+
Note the prefix "Reactive" changes to "Reactor".
22+
23+
| Before | After (with extension library) |
24+
|-----------------------|--------------------------------|
25+
| `ReactiveQueryResult` | `ReactorQueryResult` |
26+
27+
## Method names
28+
29+
| Before | After |
30+
|------------------|------------------------------------------|
31+
| `analyticsQuery` | `executeQuery` / `executeStreamingQuery` |
32+
33+
34+
## Query options
35+
36+
With the operational SDK, the caller creates an instance of `AnalayticsOptions` and configures it.
37+
The operational SDK expected you to pass positional parameters as a `JsonArray`, and named parameters as a `JsonObject`.
38+
39+
With the analytics SDK, options are specified via a callback that modifies an instance of `QueryOptions` created by the SDK.
40+
The analytics SDK takes positional parameters as a `List`, and named parameters as a `Map`.
41+
42+
_Before:_
43+
44+
```java
45+
AnalyticsResult result = operationalCluster
46+
.analyticsQuery(
47+
"SELECT ? AS greeting",
48+
AnalyticsOptions.analyticsOptions()
49+
.readonly(true)
50+
.parameters(JsonArray.from("hello world"))
51+
);
52+
```
53+
54+
_After:_
55+
56+
```java
57+
QueryResult result = analyticsCluster
58+
.executeQuery(
59+
"SELECT ? AS greeting",
60+
options -> options
61+
.readOnly(true) // uppercase "O"
62+
.parameters(List.of("hello world"))
63+
);
64+
```
65+
66+
## JsonObject and JsonArray
67+
68+
These classes are present in both the operational and analytics SDKs, but in different packages.
69+
The two versions have similar methods, but are not interchangeable.
70+
71+
The version to use with the analytics SDK is in package `com.couchbase.analytics.client.java.json`.
72+
73+
If you need to pass JSON between the analytics and operational SDKs, first convert the JSON to a byte array using the `toBytes()` method.
74+
Then use the other SDK's version to parse the JSON using the `fromJson(byte[])` method.
75+
76+
77+
## Converting row values
78+
79+
With the operational SDK, query result rows are accessed by calling `AnalyticsResult.rowsAs(<type>)`.
80+
This method returns a new list where each result row is mapped to an instance of the specified type.
81+
82+
The analytics SDK represents result rows differently.
83+
It introduces a new `Row` class that represents a single result row.
84+
The `queryResult.rows()` method returns a `List<Row>`.
85+
To convert a row to an instance of some type, call `row.as(<type>)`.
86+
If null is a valid value, call `row.asNullable(<type>)` instead.
87+
88+
Unlike the operational SDK, the analytics SDK does not have a dedicated method for converting a row to a `JsonObject`.
89+
Instead, use `row.as(JsonObject.class)`.
90+
(Make sure to use the version of `JsonObject` from the analytics SDK instead of the operational SDK, otherwise the conversion will fail.)
91+
92+
_Before:_
93+
94+
```java
95+
import com.couchbase.client.java.json.JsonObject;
96+
```
97+
98+
```java
99+
AnalyticsResult result = operationalCluster
100+
.analyticsQuery("SELECT 'hello world' AS greeting");
101+
102+
JsonObject obj = result.rowsAsObject().getFirst();
103+
System.out.println(obj.getString("greeting"));
104+
```
105+
106+
_After:_
107+
108+
```java
109+
import com.couchbase.analytics.client.java.json.JsonObject;
110+
```
111+
112+
```java
113+
QueryResult result = analyticsCluster
114+
.executeQuery("SELECT 'hello world' AS greeting");
115+
116+
JsonObject obj = result.rows().getFirst().as(JsonObject.class);
117+
System.out.println(obj.getString("greeting"));
118+
```
119+
120+
## Streaming result rows
121+
122+
In the operational SDK, the only way to stream result rows from the server is to use the Reactive API.
123+
The analytics SDK adds a safe and convenient way to stream results rows without the complexity of reactive programming.
124+
125+
_Before:_
126+
127+
```java
128+
Mono<ReactiveAnalyticsResult> resultMono = operationalCluster.reactive()
129+
.analyticsQuery("SELECT RAW i FROM ARRAY_RANGE(0, 10) as i");
130+
131+
resultMono.flatMapMany(result -> result.rowsAs(Integer.class))
132+
.doOnNext(System.out::println)
133+
.blockLast();
134+
```
135+
136+
_After:_
137+
138+
```java
139+
analyticsCluster.executeStreamingQuery(
140+
"SELECT RAW i FROM ARRAY_RANGE(0, 10) as i",
141+
row -> System.out.println(row.as(Integer.class))
142+
);
143+
```
144+
145+
To aid migration of existing reactive codebases, and to support integrations with other reactive components, the analytics SDK has an optional extension library that adds support for Project Reactor.
146+
147+
```xml
148+
<dependency>
149+
<groupId>com.couchbase.client</groupId>
150+
<artifactId>couchbase-analytics-java-client-reactor</artifactId>
151+
<version>x.y.z</version>
152+
</dependency>
153+
```
154+
155+
_After (with Reactor extension library):_
156+
157+
```java
158+
var reactor = ReactorQueryable.from(analyticsClusterOrScope);
159+
160+
Mono<ReactorQueryResult> resultMono = reactor
161+
.executeQuery("SELECT RAW i FROM ARRAY_RANGE(0, 10) as i");
162+
163+
resultMono.flatMapMany(ReactorQueryResult::rows)
164+
.map(row -> row.as(Integer.class))
165+
.doOnNext(System.out::println)
166+
.blockLast();
167+
```

README.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,43 @@ JVM clients for Couchbase Enterprise Analytics
2929
</repository>
3030
</repositories>
3131
```
32+
33+
## Example Usage
34+
35+
Start by creating a `Cluster` instance:
36+
37+
```java
38+
Cluster cluster = Cluster.newInstance(
39+
connectionString, // like "https://..."
40+
Credential.of(username, password),
41+
// The third parameter is optional.
42+
// This example sets the default query timeout to 2 minutes.
43+
clusterOptions -> clusterOptions
44+
.timeout(it -> it.queryTimeout(Duration.ofMinutes(2)))
45+
);
46+
```
47+
48+
A `Cluster` instance is thread-safe.
49+
For best performance, create a single instance and share it.
50+
51+
To execute a query whose results fit in memory:
52+
53+
```java
54+
QueryResult result = cluster.executeQuery(
55+
"SELECT `hello world` as greeting"
56+
);
57+
58+
for (Row row : result.rows()) {
59+
String greeting = row.as(ObjectNode.class)
60+
.path("greeting")
61+
.textValue();
62+
63+
System.out.println(greeting);
64+
}
65+
```
66+
67+
For more examples, including how to handle large result sets, see the source code in the [Maven project template](couchbase-analytics-java-client/examples/maven-project-template).
68+
69+
## Migrating
70+
71+
See the [Migration Guide](MIGRATING.md) if you're migrating from the Couchbase Java SDK.

couchbase-analytics-java-client/examples/maven-project-template/pom.xml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
<description>Examples project for Couchbase Analytics Java SDK</description>
1313

1414
<properties>
15+
<couchbase.client.version>1.0.0-SNAPSHOT</couchbase.client.version>
16+
1517
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
1618
<maven.compiler.release>21</maven.compiler.release>
1719
</properties>
@@ -34,7 +36,14 @@
3436
<dependency>
3537
<groupId>com.couchbase.client</groupId>
3638
<artifactId>couchbase-analytics-java-client</artifactId>
37-
<version>1.0.0-SNAPSHOT</version>
39+
<version>${couchbase.client.version}</version>
40+
</dependency>
41+
42+
<!-- For optional integration with Project Reactor -->
43+
<dependency>
44+
<groupId>com.couchbase.client</groupId>
45+
<artifactId>couchbase-analytics-java-client-reactor</artifactId>
46+
<version>${couchbase.client.version}</version>
3847
</dependency>
3948

4049
<!-- Specify your favorite SLF4J 2 binding -->

couchbase-analytics-java-client/examples/maven-project-template/src/main/java/com/example/couchbase/analytics/Example.java

Lines changed: 87 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,17 @@
1717
package com.example.couchbase.analytics;
1818

1919
import com.couchbase.analytics.client.java.Cluster;
20+
import com.couchbase.analytics.client.java.ClusterOptions;
2021
import com.couchbase.analytics.client.java.Credential;
22+
import com.couchbase.analytics.client.java.QueryOptions;
2123
import com.couchbase.analytics.client.java.QueryResult;
24+
import com.couchbase.analytics.client.java.Queryable;
25+
import com.couchbase.analytics.client.java.Row;
26+
import com.couchbase.analytics.client.java.codec.Deserializer;
27+
import com.couchbase.analytics.client.java.extension.reactor.ReactorQueryResult;
28+
import com.couchbase.analytics.client.java.extension.reactor.ReactorQueryable;
29+
import com.example.couchbase.analytics.reactor.ReactorExample;
30+
import reactor.core.publisher.Flux;
2231

2332
import java.time.Duration;
2433
import java.util.List;
@@ -38,27 +47,84 @@ public static void main(String[] args) {
3847
.timeout(it -> it.queryTimeout(Duration.ofMinutes(2)))
3948
)) {
4049

41-
// Buffered query. All rows must fit in memory.
42-
QueryResult result = cluster.executeQuery(
43-
"select ?=1",
44-
options -> options
45-
.readOnly(true)
46-
.parameters(List.of(1))
47-
);
48-
result.rows().forEach(row -> System.out.println("Got row: " + row));
49-
50-
// Alternatively --
51-
52-
// Streaming query. Rows are processed one-by-one
53-
// as they arrive from server.
54-
cluster.executeStreamingQuery(
55-
"select ?=1",
56-
row -> System.out.println("Got row: " + row),
57-
options -> options
58-
.readOnly(true)
59-
.parameters(List.of(1))
60-
);
50+
bufferedQueryExample(cluster);
51+
52+
streamingQueryExample(cluster);
53+
54+
dataBindingExample(cluster);
55+
56+
nullRowExample(cluster);
57+
58+
reactorQueryExample(cluster);
6159
}
6260
}
63-
}
6461

62+
/**
63+
* Executes a query, buffering all result rows in memory.
64+
*/
65+
static void bufferedQueryExample(Queryable clusterOrScope) {
66+
QueryResult result = clusterOrScope.executeQuery(
67+
"select ?=1",
68+
options -> options
69+
.readOnly(true)
70+
.parameters(List.of(1))
71+
);
72+
result.rows().forEach(row -> System.out.println("Got row: " + row));
73+
}
74+
75+
/**
76+
* Executes a query, processing rows one-by-one
77+
* as they arrive from server.
78+
*/
79+
static void streamingQueryExample(Queryable clusterOrScope) {
80+
clusterOrScope.executeStreamingQuery(
81+
"select ?=1",
82+
row -> System.out.println("Got row: " + row),
83+
options -> options
84+
.readOnly(true)
85+
.parameters(List.of(1))
86+
);
87+
}
88+
89+
/**
90+
* Converts a result row to a user-defined type using the default Jackson
91+
* {@link Deserializer}.
92+
*
93+
* @see ClusterOptions#deserializer(Deserializer)
94+
* @see QueryOptions#deserializer(Deserializer)
95+
*/
96+
static void dataBindingExample(Queryable clusterOrScope) {
97+
record MyRowPojo(String greeting) {}
98+
99+
QueryResult result = clusterOrScope.executeQuery(
100+
"SELECT 'hello world' AS greeting"
101+
);
102+
MyRowPojo resultRow = result.rows().getFirst().as(MyRowPojo.class);
103+
System.out.println(resultRow.greeting);
104+
}
105+
106+
/**
107+
* Calls {@link Row#asNullable} because null is an expected result row value.
108+
*/
109+
static void nullRowExample(Queryable clusterOrScope) {
110+
QueryResult result = clusterOrScope.executeQuery("SELECT RAW null");
111+
String nullableString = result.rows().getFirst().asNullable(String.class);
112+
System.out.println(nullableString);
113+
}
114+
115+
/**
116+
* Executes a query using the optional Project Reactor extension library.
117+
* <p>
118+
* Requires adding {@code com.couchbase.client:couchbase-analytics-java-client-reactor} as a dependency of your project.
119+
* <p>
120+
* See {@link ReactorExample} for more examples.
121+
*/
122+
static void reactorQueryExample(Queryable clusterOrScope) {
123+
var reactor = ReactorQueryable.from(clusterOrScope);
124+
125+
Flux<Row> resultRows = reactor.executeQuery("SELECT 1")
126+
.flatMapMany(ReactorQueryResult::rows);
127+
128+
System.out.println(resultRows.blockLast());
129+
}
130+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2025 Couchbase, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
@NullMarked
18+
package com.example.couchbase.analytics;
19+
20+
import org.jspecify.annotations.NullMarked;

0 commit comments

Comments
 (0)