Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ import io.netty.handler.codec.http.HttpServerCodec;

import io.suboptimal.connectjava.codec.protobuf.ConnectProtobufCodecs;
import io.suboptimal.connectjava.model.*;
import io.suboptimal.connectjava.protocol.*;
import io.suboptimal.connectjava.protocol.server.ConnectCorsParameters;
import io.suboptimal.connectjava.protocol.server.ConnectServerProtocol;
import io.suboptimal.connectjava.protocol.server.ConnectServerProtocolConfig;
import io.suboptimal.connectjava.protocol.server.ConnectServerProtocolParameters;

import java.util.Map;

Expand All @@ -115,18 +118,18 @@ ConnectServiceDefinition greeter = new ConnectServiceDefinition(
/* idempotent — also reachable via Unary-GET */ true)),
/* optional descriptor for introspection */ null);

ConnectProtocolConfig config = ConnectProtocolConfig
ConnectServerProtocolConfig config = ConnectServerProtocolConfig
.builder(
Map.of(greeter.serviceName(), greeter),
GreeterCallHandler::new, // ConnectCallHandlerFactory
new ConnectProtocolParameters(
new ConnectServerProtocolParameters(
/* maxRequestBytes */ 4 * 1024 * 1024,
/* maxFrameBytes */ 1 * 1024 * 1024,
ConnectCorsParameters.disabled()),
ConnectProtobufCodecs.defaults()) // proto + proto-json codecs
.build();

ConnectProtocol protocol = new ConnectProtocol(config);
ConnectServerProtocol protocol = new ConnectServerProtocol(config);

ChannelInitializer<Channel> http1Initializer = new ChannelInitializer<>() {
@Override
Expand Down Expand Up @@ -347,13 +350,13 @@ serves Connect alongside any other HTTP/1.1 or HTTP/2 protocol you implement,
with ALPN, H2C prior-knowledge, and H2C upgrade negotiation done for you:

```java
import io.suboptimal.connectjava.protocol.ConnectProtocol;
import io.suboptimal.connectjava.protocol.server.ConnectServerProtocol;
import io.suboptimal.nettymultiprotocol.AppChannelConfigurer;
import io.suboptimal.nettymultiprotocol.AppProtocol;
import io.suboptimal.nettymultiprotocol.AppProtocolRegistry;
import io.suboptimal.nettymultiprotocol.NettyMultiprotocol;

ConnectProtocol connect = new ConnectProtocol(connectConfig);
io.suboptimal.connectjava.protocol.server.ConnectServerProtocol connect = new ConnectServerProtocol(connectConfig);

AppProtocol connectAsApp = new AppProtocol() {
@Override public AppChannelConfigurer http1() { return connect.http1()::configure; }
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
<maven-compiler-plugin.version>3.15.0</maven-compiler-plugin.version>
<maven-surefire-plugin.version>3.2.5</maven-surefire-plugin.version>
<protobuf-maven-plugin.version>3.1.2</protobuf-maven-plugin.version>
<jetbrains-annotations.version>24.1.0</jetbrains-annotations.version>

<central-publishing-maven-plugin.version>0.7.0</central-publishing-maven-plugin.version>
<flatten-maven-plugin.version>1.6.0</flatten-maven-plugin.version>
Expand Down Expand Up @@ -88,6 +89,12 @@
<version>${jspecify.version}</version>
</dependency>

<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>${jetbrains-annotations.version}</version>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import io.suboptimal.connectjava.model.ConnectMethodDefinition;
import io.suboptimal.connectjava.model.ConnectServiceDefinition;
import io.suboptimal.connectjava.protocol.server.ConnectServerInterceptor;

/**
* Immutable view of a Connect RPC call passed to each {@link io.suboptimal.connectjava.protocol.ConnectInterceptor}.
* Immutable view of a Connect RPC call passed to each {@link ConnectServerInterceptor}.
*
* <p>{@link #responseHeadersBuilder()} accepts mutations until the first response payload (or the
* terminal response for unary calls) is written; after that point any mutation throws
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.suboptimal.connectjava.api;

import io.suboptimal.connectjava.model.ConnectMethodDefinition;
import io.suboptimal.connectjava.model.ConnectServiceDefinition;
import org.jspecify.annotations.Nullable;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;


/**
* Outbound trigger written by the terminal handler to initiate a Connect RPC call.
*
* @param serviceDefinition the target service
* @param methodDefinition the target method
* @param requestHeaders user request headers; keys are normalized to lower case. Protocol-managed
* headers (content-type, content-length, encodings, etc.) are ignored.
* @param preferGet when {@code true} and the method is idempotent, the call is sent as a GET
* @param codecName payload codec to use (e.g. {@code "proto"} or {@code "json"}); when
* {@code null} the codec registry's preferred codec is used
* @param timeoutMs call timeout in milliseconds; when non-null the {@code connect-timeout-ms}
* request header is set; {@code null} means no timeout header is sent
*/
public record ConnectClientCallStart(
ConnectServiceDefinition serviceDefinition,
ConnectMethodDefinition methodDefinition,
Map<String, List<String>> requestHeaders,
boolean preferGet,
@Nullable String codecName,
@Nullable Long timeoutMs
) {
public ConnectClientCallStart {
Objects.requireNonNull(serviceDefinition);
Objects.requireNonNull(methodDefinition);
Objects.requireNonNull(requestHeaders);
requestHeaders = requestHeaders
.entrySet()
.stream()
.collect(Collectors.toUnmodifiableMap(e -> e.getKey().toLowerCase(Locale.ROOT),
e -> List.copyOf(e.getValue())));
}

public ConnectClientCallStart(
ConnectServiceDefinition serviceDefinition,
ConnectMethodDefinition methodDefinition,
Map<String, List<String>> requestHeaders,
boolean preferGet,
@Nullable String codecName
) {
this(serviceDefinition, methodDefinition, requestHeaders, preferGet, codecName, null);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package io.suboptimal.connectjava.api;

import io.suboptimal.connectjava.model.ConnectMethodDefinition;
import io.suboptimal.connectjava.model.ConnectServiceDefinition;
import org.jspecify.annotations.Nullable;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

/**
* Mutable view of an outgoing {@link ConnectClientCallStart}, handed to client interceptors so
* they can shape the request in place (headers, timeout, codec, GET preference) instead of
* rebuilding an immutable record on every edit.
*
* <p>{@code serviceDefinition} and {@code methodDefinition} are read-only: an interceptor may
* read them but cannot change the call target. Not thread-safe; mutate only from the Netty event
* loop. Header names are treated case-insensitively (stored lower-cased).
*/
public final class ConnectClientCallStartBuilder {
private final ConnectServiceDefinition serviceDefinition;
private final ConnectMethodDefinition methodDefinition;
private final Map<String, List<String>> requestHeaders; // lower-cased keys, mutable
private boolean preferGet;
private @Nullable String codecName;
private @Nullable Long timeoutMs;

public ConnectClientCallStartBuilder(ConnectClientCallStart start) {
Objects.requireNonNull(start);
this.serviceDefinition = start.serviceDefinition();
this.methodDefinition = start.methodDefinition();
this.requestHeaders = new LinkedHashMap<>();
start.requestHeaders().forEach((k, v) ->
this.requestHeaders.put(k.toLowerCase(Locale.ROOT), new ArrayList<>(v)));
this.preferGet = start.preferGet();
this.codecName = start.codecName();
this.timeoutMs = start.timeoutMs();
}

public ConnectServiceDefinition serviceDefinition() { return serviceDefinition; }
public ConnectMethodDefinition methodDefinition() { return methodDefinition; }

public boolean preferGet() { return preferGet; }
public ConnectClientCallStartBuilder preferGet(boolean preferGet) {
this.preferGet = preferGet;
return this;
}

public @Nullable String codecName() { return codecName; }
public ConnectClientCallStartBuilder codecName(@Nullable String codecName) {
this.codecName = codecName;
return this;
}

public @Nullable Long timeoutMs() { return timeoutMs; }
public ConnectClientCallStartBuilder timeoutMs(@Nullable Long timeoutMs) {
this.timeoutMs = timeoutMs;
return this;
}

/** Returns the current values for {@code name} (case-insensitive); empty list if none. */
public List<String> headerValues(String name) {
List<String> values = requestHeaders.get(name.toLowerCase(Locale.ROOT));
return values != null ? List.copyOf(values) : List.of();
}

/** Appends {@code value} under {@code name} (case-insensitive), keeping existing values. */
public ConnectClientCallStartBuilder addHeader(String name, String value) {
Objects.requireNonNull(name);
Objects.requireNonNull(value);
requestHeaders.computeIfAbsent(name.toLowerCase(Locale.ROOT), k -> new ArrayList<>()).add(value);
return this;
}

/** Replaces all values for {@code name} (case-insensitive) with the single {@code value}. */
public ConnectClientCallStartBuilder setHeader(String name, String value) {
Objects.requireNonNull(name);
Objects.requireNonNull(value);
List<String> list = new ArrayList<>();
list.add(value);
requestHeaders.put(name.toLowerCase(Locale.ROOT), list);
return this;
}

/** Removes all values for {@code name} (case-insensitive). */
public ConnectClientCallStartBuilder removeHeader(String name) {
requestHeaders.remove(name.toLowerCase(Locale.ROOT));
return this;
}

/** Freezes the current state into an immutable {@link ConnectClientCallStart}. */
public ConnectClientCallStart build() {
return new ConnectClientCallStart(
serviceDefinition, methodDefinition, requestHeaders, preferGet, codecName, timeoutMs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.suboptimal.connectjava.api;

import io.suboptimal.connectjava.model.ConnectMethodDefinition;
import io.suboptimal.connectjava.model.ConnectServiceDefinition;

public record ConnectClientResponseStart (
ConnectServiceDefinition serviceDefinition,
ConnectMethodDefinition methodDefinition,
ConnectResponseMeta responseMeta
) implements ConnectMessage {}
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
package io.suboptimal.connectjava.api;

import org.jspecify.annotations.Nullable;

import java.util.List;
import java.util.Map;

/**
* Terminal signal indicating the successful end of one side of an RPC payload stream.
* Terminal signal indicating the end of one side of an RPC payload stream.
*
* <p>For streaming calls the {@code trailers} field carries trailing metadata from the
* end-stream envelope, and {@code error} is non-null when the end-stream envelope carried
* an error. A consumer must treat this message as terminal in both cases: a non-null
* {@code error} means the call failed, and the trailers are still available.
* For unary calls use {@link #INSTANCE} (no trailers, no error).
*/
public record ConnectEndOfStream() implements ConnectMessage {
public static final ConnectEndOfStream INSTANCE = new ConnectEndOfStream();
public record ConnectEndOfStream(Map<String, List<String>> trailers, @Nullable ConnectError error)
implements ConnectMessage {
public static final ConnectEndOfStream INSTANCE = new ConnectEndOfStream(Map.of(), null);

public ConnectEndOfStream(Map<String, List<String>> trailers) {
this(trailers, null);
}
}
18 changes: 16 additions & 2 deletions src/main/java/io/suboptimal/connectjava/api/ConnectError.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,25 @@
/**
* Connect-native error.
*/
public record ConnectError(ConnectErrorCode code, String message, List<ConnectErrorDetail> details)
public record ConnectError(ConnectErrorCode code,
String message,
List<ConnectErrorDetail> details,
ConnectErrorOrigin origin)
implements ConnectMessage
{
/** Defaults {@code origin} to {@link ConnectErrorOrigin#RPC}. */
public ConnectError(ConnectErrorCode code, String message, List<ConnectErrorDetail> details) {
this(code, message, details, ConnectErrorOrigin.RPC);
}

/** Defaults {@code details} to empty and {@code origin} to {@link ConnectErrorOrigin#RPC}. */
public ConnectError(ConnectErrorCode code, String message) {
this(code, message, List.of());
this(code, message, List.of(), ConnectErrorOrigin.RPC);
}

/** Returns a copy of this error tagged with {@code origin}; all other fields unchanged. */
public ConnectError withOrigin(ConnectErrorOrigin origin) {
return new ConnectError(code, message, details, origin);
}

public static ConnectError canceled(String message) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.suboptimal.connectjava.api;

/**
* Distinguishes how a {@link ConnectError} arose on the client.
*
* <ul>
* <li>{@link #RPC} — a Connect RPC-level outcome: the server (or the client itself)
* produced a Connect error with a meaningful code. This is the default for every
* {@code ConnectError}.</li>
* <li>{@link #TRANSPORT} — a transport-level rejection at the gate: a non-200 HTTP
* response whose body is not a recognized Connect error (e.g. 404/415/405/505 with
* {@code text/plain}). The Connect RPC never started.</li>
* </ul>
*/
public enum ConnectErrorOrigin {
RPC,
TRANSPORT
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

public sealed interface ConnectMessage permits
ConnectCallExchange,
ConnectClientResponseStart,
ConnectPayload,
ConnectEndOfStream,
ConnectError
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.suboptimal.connectjava.api;

import io.suboptimal.connectjava.protocol.ConnectCallObserver;
import io.suboptimal.connectjava.protocol.server.ConnectServerCallObserver;

/**
* Mutable Connect response headers collected by {@link ConnectCallObserver}s.
* Mutable Connect response headers collected by {@link ConnectServerCallObserver}s.
*
* <p>Mutations are applied to the wire response after all header observers run. Operation
* order is preserved, including the difference between {@link #set(CharSequence, CharSequence)}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.suboptimal.connectjava.api;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Metadata from a Connect RPC response.
*
* @param statusCode HTTP status code
* @param headers leading metadata (HTTP response headers without {@code trailer-} prefix)
*/
public record ConnectResponseMeta(
int statusCode,
Map<String, List<String>> headers
) {
public ConnectResponseMeta {
headers = copyLower(headers);
}

private static Map<String, List<String>> copyLower(Map<String, List<String>> source) {
return source
.entrySet()
.stream()
.collect(Collectors.toUnmodifiableMap(e -> e.getKey().toLowerCase(Locale.ROOT),
Map.Entry::getValue));
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.suboptimal.connectjava.api;

import io.suboptimal.connectjava.protocol.ConnectCallObserver;
import io.suboptimal.connectjava.protocol.server.ConnectServerCallObserver;

/**
* Mutable Connect response trailers collected by {@link ConnectCallObserver}s.
* Mutable Connect response trailers collected by {@link ConnectServerCallObserver}s.
*
* <p>For unary RPCs, trailers are serialized as {@code Trailer-*} response headers. For
* streaming RPCs, trailers are serialized as the {@code metadata} object in the
Expand Down
Loading