Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 117 additions & 31 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,41 +37,99 @@
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* A {@link Coder Coder<T>} defines how to encode and decode values of type {@code T} into
* byte streams.
* A {@link Coder} defines how values of type {@code T} are encoded into bytes and decoded back into
* objects.
*
* <p>{@link Coder} instances are serialized during job creation and deserialized before use. This
* will generally be performed by serializing the object via Java Serialization.
* <p>Coders are used by Beam to serialize data when it is transferred between transforms,
* persisted, or sent across process boundaries.
*
* <p>{@link Coder} classes for compound types are often composed from coder classes for types
* contains therein. The composition of {@link Coder} instances into a coder for the compound class
* is the subject of the {@link CoderProvider} type, which enables automatic generic composition of
* {@link Coder} classes within the {@link CoderRegistry}. See {@link CoderProvider} and {@link
* CoderRegistry} for more information about how coders are inferred.
* <p>The {@link #encode(Object, OutputStream, Context)} and {@link #decode(InputStream, Context)}
* methods must be consistent: values encoded by {@code encode} must be correctly reconstructed by
* {@code decode}.
*
* <p>All methods of a {@link Coder} are required to be thread safe.
* <p>The {@link Context} parameter specifies whether the value is encoded as a top-level element or
* as part of a larger structure. This affects whether additional information (such as length
* prefixes) is required to ensure that encoded values can be unambiguously decoded.
*
* @param <T> the type of values being encoded and decoded
* <p>For example:
*
* <ul>
* <li>In {@link Context#OUTER}, the value may consume the entire stream and does not require
* explicit length encoding.
* <li>In {@link Context#NESTED}, the value is part of a larger structure and must be encoded in a
* self-delimiting way.
* </ul>
*
* <p>Coder implementations must be:
*
* <ul>
* <li>Deterministic (when required by the pipeline)
* <li>Thread-safe
* <li>Consistent between encode and decode
* </ul>
*
* @param <T> the type of values handled by this {@link Coder}
*/
public abstract class Coder<T> implements Serializable {
/**
* The context in which encoding or decoding is being done.
* Represents the context in which encoding or decoding is performed.
*
* <p>The {@link Context} determines whether the value being encoded or decoded is part of a
* larger structure or is the outermost value in the stream.
*
* <p>This distinction is important because some coders need to include additional information
* (such as length prefixes) when values are nested inside other structures, but can omit them
* when operating on the outermost level.
*
* <p>There are two standard contexts:
*
* <ul>
* <li>{@link #OUTER} – Indicates that the value occupies the remainder of the input or output
* stream. In this case, coders may omit length information because the boundaries are
* implicitly known.
* <li>{@link #NESTED} – Indicates that the value is encoded as part of a larger structure.
* Coders must ensure that the encoded value is self-delimiting, typically by including
* length prefixes or other boundary markers.
* </ul>
*
* <p>For example:
*
* <ul>
* <li>When encoding a top-level element in a file → use {@code OUTER}
* <li>When encoding elements inside a collection (e.g., list, KV, etc.) → use {@code NESTED}
* </ul>
*
* <p>Correct usage of {@link Context} ensures that encoded data can be safely and correctly
* decoded without ambiguity.
*
* @deprecated To implement a coder, do not use any {@link Context}. Just implement only those
* abstract methods which do not accept a {@link Context} and leave the default
* implementations for methods accepting a {@link Context}.
* <p><b>Note:</b> Most coder implementations do not need to manually manage {@link Context}. They
* should delegate to component coders with the appropriate context when encoding nested
* structures.
*/
@Deprecated
public static class Context {
/**
* The outer context: the value being encoded or decoded takes up the remainder of the
* record/stream contents.
* The outer context indicates that the value being encoded or decoded occupies the remainder of
* the input or output stream.
*
* <p>In this context, the boundaries of the value are implicitly known, so coders do not need
* to include additional length information or delimiters when encoding.
*
* <p>This is typically used for top-level values, such as elements written directly to a file
* or stream.
*/
public static final Context OUTER = new Context(true);

/**
* The nested context: the value being encoded or decoded is (potentially) a part of a larger
* record/stream contents, and may have other parts encoded or decoded after it.
* The nested context indicates that the value being encoded or decoded is part of a larger
* structure and does not occupy the entire stream.
*
* <p>In this context, coders must ensure that the encoded value is self-delimiting, typically
* by including length prefixes or other boundary markers, so that subsequent data in the stream
* can be correctly decoded.
*
* <p>This is commonly used when encoding elements inside collections, key-value pairs, or other
* composite data structures.
*/
public static final Context NESTED = new Context(false);

Expand Down Expand Up @@ -112,13 +170,28 @@ public String toString() {
}

/**
* Encodes the given value of type {@code T} onto the given output stream. Multiple elements can
* be encoded next to each other on the output stream, each coder should encode information to
* know how many bytes to read when decoding. A common approach is to prefix the encoding with the
* element's encoded length.
* Encodes the given value of type {@code T} onto the provided output stream.
*
* @throws IOException if writing to the {@code OutputStream} fails for some reason
* @throws CoderException if the value could not be encoded for some reason
* <p>The encoding must be deterministic and consistent with {@link #decode}, such that values
* written by this method can be correctly reconstructed.
*
* <p>The {@link Context} determines how the value should be encoded:
*
* <ul>
* <li>In {@link Context#OUTER}, the value is written as a top-level element and may omit length
* prefixes or delimiters since it consumes the remainder of the stream.
* <li>In {@link Context#NESTED}, the value is part of a larger structure and must include
* sufficient boundary information (such as length prefixes) to allow correct decoding of
* subsequent data.
* </ul>
*
* <p>Implementations must ensure that the encoding is unambiguous and that multiple encoded
* values can be safely concatenated and decoded in sequence.
*
* @param value the value to encode
* @param outStream the output stream to write the encoded bytes to
* @throws IOException if writing to the stream fails
* @throws CoderException if the value cannot be encoded
*/
public abstract void encode(T value, OutputStream outStream) throws CoderException, IOException;

Expand All @@ -136,13 +209,26 @@ public void encode(T value, OutputStream outStream, Context context)
}

/**
* Decodes a value of type {@code T} from the given input stream in the given context. Returns the
* decoded value. Multiple elements can be encoded next to each other on the input stream, each
* coder should encode information to know how many bytes to read when decoding. A common approach
* is to prefix the encoding with the element's encoded length.
* Decodes a value of type {@code T} from the given input stream.
*
* @throws IOException if reading from the {@code InputStream} fails for some reason
* @throws CoderException if the value could not be decoded for some reason
* <p>The decoding must be consistent with {@link #encode}, such that values encoded by this coder
* can be correctly reconstructed.
*
* <p>When multiple values are encoded sequentially in a stream, implementations must read exactly
* the bytes corresponding to a single encoded value and no more. This ensures that subsequent
* values in the stream can be decoded correctly.
*
* <p>Depending on how the value was encoded, the implementation may rely on implicit boundaries
* (for outer context) or explicit boundary information such as length prefixes (for nested
* context).
*
* <p>Implementations must ensure that decoding is unambiguous and does not consume bytes beyond
* the encoded representation of the value.
*
* @param inStream the input stream to read the encoded value from
* @return the decoded value
* @throws IOException if reading from the stream fails
* @throws CoderException if the value cannot be decoded
*/
public abstract T decode(InputStream inStream) throws CoderException, IOException;

Expand Down
Loading