Skip to content
6 changes: 5 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ value, it is recommended to only populate overridden properties in the custom `a
| `hedera.mirror.importer.downloader.balance.writeFiles` | false | Whether to write verified stream files to the filesystem. |
| `hedera.mirror.importer.downloader.balance.writeSignatures` | false | Whether to write verified signature files to the filesystem. |
| `hedera.mirror.importer.downloader.batchSize` | 25 | The number of signature files to download per node before downloading the signed files |
| `hedera.mirror.importer.downloader.block.enabled` | false | Whether to enable block stream files poller |
| `hedera.mirror.importer.downloader.block.frequency` | 100ms | The fixed period between invocations. Can accept duration units like `10s`, `2m`, etc. If not specified, millisecond is implied as the unit. |
| `hedera.mirror.importer.downloader.block.persistBytes` | false | Whether to persist the block stream file bytes to the database. |
| `hedera.mirror.importer.downloader.block.writeFiles` | false | Whether to write verified block stream files to the filesystem. |
| `hedera.mirror.importer.downloader.bucketName` | | The cloud storage bucket name to download streamed files. This value takes priority over network hardcoded bucket names regardless of `hedera.mirror.importer.network` value. |
| `hedera.mirror.importer.downloader.cloudProvider` | S3 | The cloud provider to download files from. Either `GCP`, `LOCAL`, or `S3`. |
| `hedera.mirror.importer.downloader.consensusRatio` | 0.33333333333 | The ratio of verified nodes (nodes used to come to consensus on the signature file hash) to total number of nodes available. |
Expand Down Expand Up @@ -192,7 +196,7 @@ value, it is recommended to only populate overridden properties in the custom `a
| `hedera.mirror.importer.topicRunningHashV2AddedTimestamp` | Network-based | Unix timestamp (in nanos) of first topic message with v2 as running hash version. Use this config to override the default network based value |
| `hedera.mirror.importer.shard` | 0 | The default shard number that the component participates in |
| `hedera.mirror.importer.startDate` | | The start date (inclusive) of the data to import. It takes effect 1) if it's set and the date is after the last downloaded file or the database is empty; 2) if it's not set and the database is empty, it defaults to now. Format: YYYY-MM-ddTHH:mm:ss.nnnnnnnnnZ |
| `hedera.mirror.importer.startBlockNumber` | null | The block number that will be set as the downloaded stream files starting index. |
| `hedera.mirror.importer.startBlockNumber` | null | The block number that will be set as the downloaded stream files starting index. For block stream files, it's the first block to download and ignored if there are existing blocks in database. |

### Transaction and Entity Filtering

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@

public interface StreamFile<T extends StreamItem> {

default void clear() {
default StreamFile<T> clear() {
setBytes(null);
setItems(List.of());
return this;

Check warning on line 28 in hedera-mirror-common/src/main/java/com/hedera/mirror/common/domain/StreamFile.java

View check run for this annotation

Codecov / codecov/patch

hedera-mirror-common/src/main/java/com/hedera/mirror/common/domain/StreamFile.java#L28

Added line #L28 was not covered by tests
}

StreamFile<T> copy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ public enum StreamType {

public static final String SIGNATURE_SUFFIX = "_sig";

private static final String PARSED = "parsed";
private static final String SIGNATURES = "signatures";

private final SortedSet<Extension> dataExtensions;
private final String nodePrefix;
private final String path;
Expand Down Expand Up @@ -78,14 +75,6 @@ public enum StreamType {
.collect(ImmutableSortedSet.toImmutableSortedSet(Comparator.naturalOrder()));
}

public String getParsed() {
return PARSED;
}

public String getSignatures() {
return SIGNATURES;
}

public boolean isChained() {
return this != BALANCE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.hedera.mirror.common.domain.transaction;

import static org.apache.commons.lang3.StringUtils.leftPad;

import com.hedera.hapi.block.stream.output.protoc.BlockHeader;
import com.hedera.hapi.block.stream.protoc.BlockProof;
import com.hedera.hapi.block.stream.protoc.RecordFileItem;
Expand All @@ -38,6 +40,10 @@
@NoArgsConstructor
public class BlockFile implements StreamFile<BlockItem> {

private static final int BASENAME_LENGTH = 36;
private static final char BASENAME_PADDING = '0';
private static final String COMPRESSED_FILE_SUFFIX = ".blk.gz";

// Contains the block number and the previous block hash
private BlockHeader blockHeader;

Expand Down Expand Up @@ -86,6 +92,14 @@ public class BlockFile implements StreamFile<BlockItem> {

private int version;

public static String getBlockStreamFilename(long blockNumber) {
if (blockNumber < 0) {
throw new IllegalArgumentException("Block number must be non-negative");
}

return leftPad(Long.toString(blockNumber), BASENAME_LENGTH, BASENAME_PADDING) + COMPRESSED_FILE_SUFFIX;
}

@Override
public StreamFile<BlockItem> copy() {
return this.toBuilder().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,11 @@ public class RecordFile implements StreamFile<RecordItem> {
private int version;

@Override
public void clear() {
public RecordFile clear() {
StreamFile.super.clear();
setLogsBloom(null);
setSidecars(List.of());
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,21 @@
package com.hedera.mirror.common.domain.transaction;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import org.junit.jupiter.api.Test;

class BlockFileTest {

@Test
void getBlockStreamFilename() {
assertThat(BlockFile.getBlockStreamFilename(0)).isEqualTo("000000000000000000000000000000000000.blk.gz");
assertThat(BlockFile.getBlockStreamFilename(1)).isEqualTo("000000000000000000000000000000000001.blk.gz");
assertThatThrownBy(() -> BlockFile.getBlockStreamFilename(-1))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Block number must be non-negative");
}

@Test
void onNewRound() {
var blockFile = BlockFile.builder().onNewRound(1L).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class ImporterProperties {

private Instant startDate;

@Min(0)
private Long startBlockNumber;

private Long topicRunningHashV2AddedTimestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public class MetricsExecutionInterceptor implements ExecutionInterceptor {
private static final Pattern ENTITY_ID_PATTERN =
Pattern.compile("/(balance|record)(s_)?(\\d{1,10})\\.\\d{1,10}\\.(\\d{1,10})");
private static final Pattern SIDECAR_PATTERN = Pattern.compile("Z_\\d{1,2}\\.rcd");
private static final Pattern NODE_ID_PATTERN = Pattern.compile("[^/]/(\\d{1,10})/(\\d{1,10})/(balance|record)/");
private static final Pattern NODE_ID_PATTERN =
Pattern.compile("[^/]/+(\\d{1,10})/(\\d{1,10})/((balance|record)/)?");

private final MeterRegistry meterRegistry;

Expand Down Expand Up @@ -130,18 +131,18 @@ private UriAttributes getUriAttributes(URI uri) {
var action = getAction(uriComponent);

Matcher accountIdMatcher = ENTITY_ID_PATTERN.matcher(uriComponent);
if (accountIdMatcher.find() && accountIdMatcher.groupCount() == 4) {
if (accountIdMatcher.find()) {
var shard = accountIdMatcher.group(3);
var nodeId = String.valueOf(Long.parseLong(accountIdMatcher.group(4)) - 3L);
var streamType = accountIdMatcher.group(1);
return new UriAttributes(action, nodeId, shard, streamType.toUpperCase());
}

Matcher nodeIdMatcher = NODE_ID_PATTERN.matcher(uriComponent);
if (nodeIdMatcher.find() && nodeIdMatcher.groupCount() == 3) {
if (nodeIdMatcher.find()) {
var shard = nodeIdMatcher.group(1);
var nodeId = nodeIdMatcher.group(2);
var streamType = nodeIdMatcher.group(3);
var streamType = nodeIdMatcher.group(4) != null ? nodeIdMatcher.group(4) : StreamType.BLOCK.name();
return new UriAttributes(action, nodeId, shard, streamType.toUpperCase());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import com.google.common.base.Splitter;
import com.hedera.mirror.common.domain.StreamType;
import com.hedera.mirror.common.domain.transaction.BlockFile;
import com.hedera.mirror.importer.downloader.provider.S3StreamFileProvider;
import com.hedera.mirror.importer.exception.InvalidStreamFileException;
import java.time.Instant;
Expand Down Expand Up @@ -125,6 +126,10 @@ private StreamFilename(String path, String filename, String pathSeparator) {
this.filePath = builder.toString();
}

public static StreamFilename from(long blockNumber) {
return from(BlockFile.getBlockStreamFilename(blockNumber));
}

public static StreamFilename from(String filePath) {
return from(filePath, S3StreamFileProvider.SEPARATOR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public abstract class Downloader<T extends StreamFile<I>, I extends StreamItem>
protected final StreamFileReader<T, ?> streamFileReader;
protected final StreamFileNotifier streamFileNotifier;
protected final DateRangeCalculator dateRangeCalculator;
protected final AtomicReference<Optional<T>> lastStreamFile = new AtomicReference<>(Optional.empty());
protected final AtomicReference<Optional<StreamFile<I>>> lastStreamFile = new AtomicReference<>(Optional.empty());

private final ConsensusNodeService consensusNodeService;
private final StreamType streamType;
Expand Down Expand Up @@ -268,7 +268,7 @@ private StreamFilename getStartAfterFilename() {
return lastStreamFile
.get()
.or(() -> {
Optional<T> streamFile = dateRangeCalculator.getLastStreamFile(streamType);
Optional<StreamFile<I>> streamFile = dateRangeCalculator.getLastStreamFile(streamType);
lastStreamFile.compareAndSet(Optional.empty(), streamFile);
return streamFile;
})
Expand Down Expand Up @@ -404,8 +404,7 @@ protected void onVerified(StreamFileData streamFileData, T streamFile, Consensus
downloadLatencyMetric.record(Duration.between(consensusEnd, Instant.now()));

// Cache a copy of the streamFile with bytes and items set to null so as not to keep them in memory
var copy = (T) streamFile.copy();
copy.clear();
var copy = streamFile.copy().clear();
lastStreamFile.set(Optional.of(copy));
}

Expand Down Expand Up @@ -508,11 +507,11 @@ private Collection<ConsensusNode> partialCollection(Collection<ConsensusNode> al
return allNodes;
}

var nodes = new ArrayList<ConsensusNode>(allNodes);
var nodes = new ArrayList<>(allNodes);
// shuffle nodes into a random order
Collections.shuffle(nodes);

long totalStake = nodes.get(0).getTotalStake();
long totalStake = nodes.getFirst().getTotalStake();
// only keep "just enough" nodes to reach/exceed downloadRatio
long neededStake = BigDecimal.valueOf(totalStake)
.multiply(downloadRatio)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (C) 2025 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.mirror.importer.downloader;

public interface StreamPoller {
void poll();
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public RecordFile transform(BlockFile blockFile) {
.hapiVersionMinor(minor)
.hapiVersionPatch(patch)
.hash(blockFile.getHash())
.index(blockHeader.getNumber())
.index(blockFile.getIndex())
.items(getRecordItems(blockFile.getItems(), hapiVersion))
.loadEnd(blockFile.getLoadEnd())
.loadStart(blockFile.getLoadStart())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (C) 2025 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.mirror.importer.downloader.block;

import jakarta.validation.constraints.NotNull;
import java.time.Duration;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;

@Component("blockPollerProperties")
@ConfigurationProperties("hedera.mirror.importer.downloader.block")
@Data
@RequiredArgsConstructor
@Validated
public class BlockPollerProperties {

private boolean enabled = false;

@NotNull
private Duration frequency = Duration.ofMillis(100L);

private boolean persistBytes = false;

private boolean writeFiles = false;
}
Loading
Loading