Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/trunk' into refine-blocks-to-rec…
Browse files Browse the repository at this point in the history
…onstruct-logic
  • Loading branch information
VicoWu committed Jul 25, 2024
2 parents 5de85a0 + 4525c7e commit 0582ce1
Show file tree
Hide file tree
Showing 78 changed files with 4,658 additions and 492 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
* Exception to denote if the underlying stream, cache or other closable resource
* is closed.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class ClosedIOException extends PathIOException {

/**
* Appends the custom error-message to the default error message.
* @param path path that encountered the closed resource.
* @param message custom error message.
*/
public ClosedIOException(String path, String message) {
super(path, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,7 @@ public class CommonConfigurationKeysPublic {
"fs.s3a.*.server-side-encryption.key",
"fs.s3a.encryption.algorithm",
"fs.s3a.encryption.key",
"fs.s3a.encryption.context",
"fs.azure\\.account.key.*",
"credential$",
"oauth.*secret",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,7 @@
fs.s3a.*.server-side-encryption.key
fs.s3a.encryption.algorithm
fs.s3a.encryption.key
fs.s3a.encryption.context
fs.s3a.secret.key
fs.s3a.*.secret.key
fs.s3a.session.key
Expand Down Expand Up @@ -1760,6 +1761,15 @@
</description>
</property>

<property>
<name>fs.s3a.encryption.context</name>
<description>Specific encryption context to use if fs.s3a.encryption.algorithm
has been set to 'SSE-KMS' or 'DSSE-KMS'. The value of this property is a set
of non-secret comma-separated key-value pairs of additional contextual
information about the data that are separated by equal operator (=).
</description>
</property>

<property>
<name>fs.s3a.signing-algorithm</name>
<description>Override the default signing algorithm so legacy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1471,6 +1471,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_JOURNALNODE_SYNC_INTERVAL_KEY =
"dfs.journalnode.sync.interval";
public static final long DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT = 2*60*1000L;
public static final String DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY =
"dfs.journalnode.enable.sync.format";
public static final boolean DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_DEFAULT = false;
public static final String DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY =
"dfs.journalnode.edit-cache-size.bytes";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.security.KerberosInfo;
Expand Down Expand Up @@ -51,4 +52,13 @@ GetEditLogManifestResponseProto getEditLogManifestFromJournal(
String jid, String nameServiceId, long sinceTxId, boolean inProgressOk)
throws IOException;

/**
* Get the storage info for the specified journal.
* @param jid the journal identifier
* @param nameServiceId the name service id
* @return the storage info object
*/
StorageInfoProto getStorageInfo(String jid, String nameServiceId)
throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetStorageInfoRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;

Expand Down Expand Up @@ -60,4 +62,18 @@ public GetEditLogManifestResponseProto getEditLogManifestFromJournal(
throw new ServiceException(e);
}
}

@Override
public StorageInfoProto getStorageInfo(
RpcController controller, GetStorageInfoRequestProto request)
throws ServiceException {
try {
return impl.getStorageInfo(
request.getJid().getIdentifier(),
request.hasNameServiceId() ? request.getNameServiceId() : null
);
} catch (IOException e) {
throw new ServiceException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.hadoop.hdfs.qjournal.protocolPB;

import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
Expand Down Expand Up @@ -75,6 +77,18 @@ public GetEditLogManifestResponseProto getEditLogManifestFromJournal(
req.build()));
}

@Override
public StorageInfoProto getStorageInfo(String jid, String nameServiceId)
throws IOException {
InterQJournalProtocolProtos.GetStorageInfoRequestProto.Builder req =
InterQJournalProtocolProtos.GetStorageInfoRequestProto.newBuilder()
.setJid(convertJournalId(jid));
if (nameServiceId != null) {
req.setNameServiceId(nameServiceId);
}
return ipc(() -> rpcProxy.getStorageInfo(NULL_CONTROLLER, req.build()));
}

private QJournalProtocolProtos.JournalIdProto convertJournalId(String jid) {
return QJournalProtocolProtos.JournalIdProto.newBuilder()
.setIdentifier(jid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.qjournal.server;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.slf4j.Logger;
import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -71,14 +72,14 @@ public class JournalNodeRpcServer implements QJournalProtocol,

JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException {
this.jn = jn;

Configuration confCopy = new Configuration(conf);

// Ensure that nagling doesn't kick in, which could cause latency issues.
confCopy.setBoolean(
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
true);

InetSocketAddress addr = getAddress(confCopy);
String bindHost = conf.getTrimmed(DFS_JOURNALNODE_RPC_BIND_HOST_KEY, null);
if (bindHost == null) {
Expand All @@ -104,7 +105,7 @@ public class JournalNodeRpcServer implements QJournalProtocol,
this.handlerCount = confHandlerCount;
LOG.info("The number of JournalNodeRpcServer handlers is {}.",
this.handlerCount);

this.server = new RPC.Builder(confCopy)
.setProtocol(QJournalProtocolPB.class)
.setInstance(service)
Expand Down Expand Up @@ -149,15 +150,15 @@ void start() {
public InetSocketAddress getAddress() {
return server.getListenerAddress();
}

void join() throws InterruptedException {
this.server.join();
}

void stop() {
this.server.stop();
}

static InetSocketAddress getAddress(Configuration conf) {
String addr = conf.get(
DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
Expand Down Expand Up @@ -211,7 +212,7 @@ public void journal(RequestInfo reqInfo,
jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
.journal(reqInfo, segmentTxId, firstTxnId, numTxns, records);
}

@Override
public void heartbeat(RequestInfo reqInfo) throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
Expand Down Expand Up @@ -245,17 +246,24 @@ public GetEditLogManifestResponseProto getEditLogManifest(
String jid, String nameServiceId,
long sinceTxId, boolean inProgressOk)
throws IOException {

RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid, nameServiceId)
.getEditLogManifest(sinceTxId, inProgressOk);

return GetEditLogManifestResponseProto.newBuilder()
.setManifest(PBHelper.convert(manifest))
.setHttpPort(jn.getBoundHttpAddress().getPort())
.setFromURL(jn.getHttpServerURI())
.build();
}

@Override
public StorageInfoProto getStorageInfo(String jid,
String nameServiceId) throws IOException {
StorageInfo storage = jn.getOrCreateJournal(jid, nameServiceId).getStorage();
return PBHelper.convert(storage);
}

@Override
public GetJournaledEditsResponseProto getJournaledEdits(String jid,
String nameServiceId, long sinceTxId, int maxTxns) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.hadoop.hdfs.qjournal.server;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -79,6 +82,7 @@ public class JournalNodeSyncer {
private int numOtherJNs;
private int journalNodeIndexForSync = 0;
private final long journalSyncInterval;
private final boolean tryFormatting;
private final int logSegmentTransferTimeout;
private final DataTransferThrottler throttler;
private final JournalMetrics metrics;
Expand All @@ -98,6 +102,9 @@ public class JournalNodeSyncer {
logSegmentTransferTimeout = conf.getInt(
DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY,
DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT);
tryFormatting = conf.getBoolean(
DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY,
DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_DEFAULT);
throttler = getThrottler(conf);
metrics = journal.getMetrics();
journalSyncerStarted = false;
Expand Down Expand Up @@ -171,6 +178,8 @@ private void startSyncJournalsDaemon() {
// Wait for journal to be formatted to create edits.sync directory
while(!journal.isFormatted()) {
try {
// Format the journal with namespace info from the other JNs if it is not formatted
formatWithSyncer();
Thread.sleep(journalSyncInterval);
} catch (InterruptedException e) {
LOG.error("JournalNodeSyncer daemon received Runtime exception.", e);
Expand All @@ -187,7 +196,15 @@ private void startSyncJournalsDaemon() {
while(shouldSync) {
try {
if (!journal.isFormatted()) {
LOG.warn("Journal cannot sync. Not formatted.");
LOG.warn("Journal cannot sync. Not formatted. Trying to format with the syncer");
formatWithSyncer();
if (journal.isFormatted() && !createEditsSyncDir()) {
LOG.error("Failed to create directory for downloading log " +
"segments: {}. Stopping Journal Node Sync.",
journal.getStorage().getEditsSyncDir());
return;
}
continue;
} else {
syncJournals();
}
Expand Down Expand Up @@ -233,6 +250,68 @@ private void syncJournals() {
journalNodeIndexForSync = (journalNodeIndexForSync + 1) % numOtherJNs;
}

private void formatWithSyncer() {
if (!tryFormatting) {
return;
}
LOG.info("Trying to format the journal with the syncer");
try {
StorageInfo storage = null;
for (JournalNodeProxy jnProxy : otherJNProxies) {
if (!hasEditLogs(jnProxy)) {
// This avoids a race condition between `hdfs namenode -format` and
// JN syncer by checking if the other JN is not newly formatted.
continue;
}
try {
HdfsServerProtos.StorageInfoProto storageInfoResponse =
jnProxy.jnProxy.getStorageInfo(jid, nameServiceId);
storage = PBHelper.convert(
storageInfoResponse, HdfsServerConstants.NodeType.JOURNAL_NODE
);
if (storage.getNamespaceID() == 0) {
LOG.error("Got invalid StorageInfo from " + jnProxy);
storage = null;
continue;
}
LOG.info("Got StorageInfo " + storage + " from " + jnProxy);
break;
} catch (IOException e) {
LOG.error("Could not get StorageInfo from " + jnProxy, e);
}
}
if (storage == null) {
LOG.error("Could not get StorageInfo from any JournalNode. " +
"JournalNodeSyncer cannot format the journal.");
return;
}
NamespaceInfo nsInfo = new NamespaceInfo(storage);
journal.format(nsInfo, true);
} catch (IOException e) {
LOG.error("Exception in formatting the journal with the syncer", e);
}
}

private boolean hasEditLogs(JournalNodeProxy journalProxy) {
GetEditLogManifestResponseProto editLogManifest;
try {
editLogManifest = journalProxy.jnProxy.getEditLogManifestFromJournal(
jid, nameServiceId, 0, false);
} catch (IOException e) {
LOG.error("Could not get edit log manifest from " + journalProxy, e);
return false;
}

List<RemoteEditLog> otherJournalEditLogs = PBHelper.convert(
editLogManifest.getManifest()).getLogs();
if (otherJournalEditLogs == null || otherJournalEditLogs.isEmpty()) {
LOG.warn("Journal at " + journalProxy.jnAddr + " has no edit logs");
return false;
}

return true;
}

private void syncWithJournalAtIndex(int index) {
LOG.info("Syncing Journal " + jn.getBoundIpcAddress().getAddress() + ":"
+ jn.getBoundIpcAddress().getPort() + " with "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,15 @@ package hadoop.hdfs.qjournal;
import "HdfsServer.proto";
import "QJournalProtocol.proto";

message GetStorageInfoRequestProto {
required JournalIdProto jid = 1;
optional string nameServiceId = 2;
}

service InterQJournalProtocolService {
rpc getEditLogManifestFromJournal(GetEditLogManifestRequestProto)
returns (GetEditLogManifestResponseProto);

rpc getStorageInfo(GetStorageInfoRequestProto)
returns (StorageInfoProto);
}
Loading

0 comments on commit 0582ce1

Please sign in to comment.