Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.segment.loading;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;

import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Map;

/**
* Base for {@link LoadSpec} wrappers that carry partial-load metadata (a fingerprint identifying the request the
* coordinator made plus a raw {@code delegate} inner load spec describing where the segment data lives) alongside
* scheme-specific data in concrete subtypes (projection names, row ranges, etc.). The {@code delegate} is held as a
* raw {@link Map} so the concrete backend type (e.g. {@code s3}, {@code local}, {@code hdfs}) is only materialized
* when needed via {@link #materializedDelegate()}; this avoids pulling backend-specific dependencies onto every node
* that touches the wire form.
* <p>
* The default {@link #loadSegment(File)} and {@link #openRangeReader()} delegate verbatim to the materialized inner
* load spec — i.e., a full load. Subtypes override these only when they actually implement scheme-specific partial
* loading; until then, historicals fall back to a full download and the announcement-side path stamps the
* wrapper's fingerprint plus the full segment size on the load announcement so the coordinator counts the replica
* as a satisfying full-fallback rather than re-queueing the load.
* <p>
* Wire-format conventions required of all subtypes (enforced by code that inspects raw {@code Map}-form load specs):
* <ul>
* <li>The Jackson {@code @JsonTypeName} starts with {@link #TYPE_PREFIX} ({@code "partial"}).</li>
* <li>The wire form includes {@code fingerprint} ({@link String}) and {@code delegate} ({@link Map}) at the top
* level. These are provided by {@link #getFingerprint()} and {@link #getDelegate()} on this base class, so
* subtypes inherit the contract automatically.</li>
* </ul>
* Together these let callers identify any partial-load wrapper from its raw map form without enumerating concrete
* subtypes.
*/
public abstract class PartialLoadSpec implements LoadSpec
{
/**
* All partial-load LoadSpec wire types use this Jackson type-name prefix. See class doc for the convention.
*/
public static final String TYPE_PREFIX = "partial";

/**
* Returns {@code true} if {@code loadSpec} matches the shape of the {@link PartialLoadSpec} subtype.
* Convention-based detection (no subtype allowlist): the {@code type} field must be a {@link String} starting with
* {@link #TYPE_PREFIX}, the {@code fingerprint} field must be a {@link String}, and the {@code delegate} field
* must be a {@link Map}. These properties are enforced by this base class's {@code @JsonProperty} getters, so any
* subtype satisfies them automatically.
*/
public static boolean detectPartialLoadSpec(@Nullable Map<String, Object> loadSpec)
{
return loadSpec != null
&& loadSpec.get("type") instanceof String typeString
&& typeString.startsWith(TYPE_PREFIX)
&& loadSpec.get("fingerprint") instanceof String
&& loadSpec.get("delegate") instanceof Map;
}

/**
* Returns {@code true} if {@code loadSpec}'s {@code type} field claims partial-load semantics (starts with
* {@link #TYPE_PREFIX}), regardless of whether the remaining wire form is well-formed. Useful when callers want
* to distinguish "not a partial-load wrapper" from "claims to be partial but the {@code fingerprint} or
* {@code delegate} fields are missing or malformed" — the latter typically indicates a bug worth logging.
*/
public static boolean hasPartialTypePrefix(@Nullable Map<String, Object> loadSpec)
{
return loadSpec != null
&& loadSpec.get("type") instanceof String typeString
&& typeString.startsWith(TYPE_PREFIX);
}

private final Map<String, Object> delegate;
private final String fingerprint;
private final Supplier<LoadSpec> materializedDelegateSupplier;

protected PartialLoadSpec(Map<String, Object> delegate, String fingerprint, ObjectMapper jsonMapper)
{
this.delegate = Preconditions.checkNotNull(delegate, "delegate");
this.fingerprint = Preconditions.checkNotNull(fingerprint, "fingerprint");
Preconditions.checkNotNull(jsonMapper, "jsonMapper");
this.materializedDelegateSupplier = Suppliers.memoize(() -> jsonMapper.convertValue(delegate, LoadSpec.class));
}

@JsonProperty
public final Map<String, Object> getDelegate()
{
return delegate;
}

@JsonProperty
public final String getFingerprint()
{
return fingerprint;
}

/**
* Returns the materialized inner {@link LoadSpec}. Lazy + memoized so the backend-specific deserialization runs
* once per wrapper instance. Used by the default {@link #loadSegment(File)} / {@link #openRangeReader()}
* implementations and available to subtypes that want to compose full-fallback into their scheme-specific paths.
*/
protected LoadSpec materializedDelegate()
{
return materializedDelegateSupplier.get();
}

@Override
public LoadSpecResult loadSegment(File destDir) throws SegmentLoadingException
{
return materializedDelegate().loadSegment(destDir);
}

@Override
@Nullable
public SegmentRangeReader openRangeReader() throws IOException
{
return materializedDelegate().openRangeReader();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,47 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import org.apache.druid.utils.CollectionUtils;

import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* A {@link LoadSpec} wrapper that carries partial-projection metadata from the coordinator to a historical alongside
* the original backend-specific load spec. The wrapped {@code delegate} is held as a raw {@link Map} so that the
* concrete backend type (e.g. {@code s3}, {@code local}, {@code hdfs}) is materialized only when needed; this avoids
* pulling backend-specific dependencies onto every node that touches the wire form.
* A {@link PartialLoadSpec} that requests partial loading of a segment's projections. The base class carries the
* common {@code fingerprint} and {@code delegate} wire fields; this subtype adds the resolved projection names that
* the historical should range-read into the local segment.
* <p>
* Both {@link #loadSegment(File)} and {@link #openRangeReader()} delegate verbatim to the inner load spec. The
* historical-side partial-load path inspects this wrapper at mount time to learn which projections to range-read and
* the fingerprint identifying the request the coordinator made.
* The historical-side partial-load path inspects this wrapper at mount time. Until that path exists, the base
* class's default {@link #loadSegment} performs a full download via the inner delegate, and the announcement layer
* stamps the fingerprint + full size on the response so the coordinator's reconciler counts the replica as a
* satisfying full-fallback rather than re-queuing the load.
*/
@JsonTypeName(PartialProjectionLoadSpec.TYPE)
public class PartialProjectionLoadSpec implements LoadSpec
public class PartialProjectionLoadSpec extends PartialLoadSpec
{
public static final String TYPE = "partialProjection";

private final Map<String, Object> delegate;
/**
* Builds the raw wire-form {@link Map} representation of a {@link PartialProjectionLoadSpec} request. Used by the
* coordinator-side matcher (which doesn't instantiate the typed class because doing so would require plumbing an
* {@link ObjectMapper} through every matcher just to satisfy the constructor's lazy-delegate supplier).
*/
public static Map<String, Object> wireForm(
Map<String, Object> delegate,
List<String> projections,
String fingerprint
)
{
return Map.of(
"type", TYPE,
"delegate", delegate,
"projections", projections,
"fingerprint", fingerprint
);
}

private final List<String> projections;
private final String fingerprint;
private final Supplier<LoadSpec> materializedDelegateSupplier;

@JsonCreator
public PartialProjectionLoadSpec(
Expand All @@ -64,21 +75,12 @@ public PartialProjectionLoadSpec(
@JacksonInject ObjectMapper jsonMapper
)
{
Preconditions.checkNotNull(jsonMapper, "jsonMapper");
this.delegate = Preconditions.checkNotNull(delegate, "delegate");
super(delegate, fingerprint, jsonMapper);
Preconditions.checkArgument(
!CollectionUtils.isNullOrEmpty(projections),
"projections must not be null or empty"
);
this.projections = List.copyOf(projections);
this.fingerprint = Preconditions.checkNotNull(fingerprint, "fingerprint");
this.materializedDelegateSupplier = Suppliers.memoize(() -> jsonMapper.convertValue(delegate, LoadSpec.class));
}

@JsonProperty
public Map<String, Object> getDelegate()
{
return delegate;
}

@JsonProperty
Expand All @@ -87,25 +89,6 @@ public List<String> getProjections()
return projections;
}

@JsonProperty
public String getFingerprint()
{
return fingerprint;
}

@Override
public LoadSpecResult loadSegment(File destDir) throws SegmentLoadingException
{
return materializedDelegateSupplier.get().loadSegment(destDir);
}

@Override
@Nullable
public SegmentRangeReader openRangeReader() throws IOException
{
return materializedDelegateSupplier.get().openRangeReader();
}

@Override
public boolean equals(Object o)
{
Expand All @@ -116,24 +99,24 @@ public boolean equals(Object o)
return false;
}
PartialProjectionLoadSpec that = (PartialProjectionLoadSpec) o;
return Objects.equals(delegate, that.delegate)
return Objects.equals(getDelegate(), that.getDelegate())
&& Objects.equals(projections, that.projections)
&& Objects.equals(fingerprint, that.fingerprint);
&& Objects.equals(getFingerprint(), that.getFingerprint());
}

@Override
public int hashCode()
{
return Objects.hash(delegate, projections, fingerprint);
return Objects.hash(getDelegate(), projections, getFingerprint());
}

@Override
public String toString()
{
return "PartialProjectionLoadSpec{" +
"delegate=" + delegate +
"delegate=" + getDelegate() +
", projections=" + projections +
", fingerprint=" + fingerprint +
", fingerprint=" + getFingerprint() +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.client;

import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
import org.apache.druid.server.coordinator.loading.PartialLoadProfile;
import org.apache.druid.timeline.DataSegment;

import javax.annotation.Nullable;
import java.util.Objects;

/**
* A {@link DataSegment} carrying optional partial-load metadata describing how it was loaded onto a particular server.
* Extends {@code DataSegment} so the inventory ({@link DruidDataSource} / {@link ImmutableDruidDataSource}) can store
* either a bare {@code DataSegment} (full-load, the common case) or a {@code DataSegmentAndLoadProfile} (partial-load)
* in a single map slot. Full-load segments pay zero per-segment wrapper overhead, only segments with a non-null
* profile allocate the subclass.
* <p>
* Inherits all {@code DataSegment} JSON serialization, equality, hashCode, and ordering. The added profile field is
* marked to be ignored by JSON serialization since the profile travels via {@link SegmentChangeRequestLoad} wire
* fields, not as a segment property.
*/
public class DataSegmentAndLoadProfile extends DataSegment
{
@JsonIgnore
private final PartialLoadProfile profile;

public DataSegmentAndLoadProfile(DataSegment delegate, PartialLoadProfile profile)
{
super(
delegate.getDataSource(),
delegate.getInterval(),
delegate.getVersion(),
delegate.getLoadSpec(),
delegate.getDimensions(),
delegate.getMetrics(),
delegate.getProjections(),
delegate.getShardSpec(),
delegate.getLastCompactionState(),
delegate.getBinaryVersion(),
delegate.getSize(),
delegate.getTotalRows(),
delegate.getIndexingStateFingerprint(),
PruneSpecsHolder.DEFAULT
);
this.profile = Objects.requireNonNull(profile, "profile");
}

/**
* The {@link PartialLoadProfile} for this segment on the host server. Never null, bare {@link DataSegment}
* instances are used to represent the no-profile case.
*/
public PartialLoadProfile profile()
{
return profile;
}

/**
* The realized on-disk footprint of this segment on the server: {@link PartialLoadProfile#loadedBytes()} when the
* profile carries it, else {@link DataSegment#getSize()}.
*/
public long effectiveSize()
{
final Long loadedBytes = profile.loadedBytes();
return loadedBytes != null ? loadedBytes : getSize();
}

/**
* Returns the partial-load profile attached to {@code segment} if it is a {@link DataSegmentAndLoadProfile}, else
* {@code null}. Convenience for the common pattern of reading the profile out of the inventory's
* {@code Map<SegmentId, DataSegment>} value type.
*/
@Nullable
public static PartialLoadProfile profileOf(DataSegment segment)
{
return segment instanceof DataSegmentAndLoadProfile p ? p.profile() : null;
}

/**
* Returns the on-disk footprint of {@code segment}: the partial-load profile's {@code loadedBytes} when present,
* else the segment's full {@link DataSegment#getSize() size}.
*/
public static long effectiveSizeOf(DataSegment segment)
{
return segment instanceof DataSegmentAndLoadProfile p ? p.effectiveSize() : segment.getSize();
}
}
Loading
Loading