Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bootstrap Performance Improvement[PR3] : Adding File Store And Related Interfaces #2970

Closed
wants to merge 11 commits into from
16 changes: 16 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/server/StoreManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.github.ambry.store.Store;
import com.github.ambry.store.StoreException;
import java.io.IOException;
import java.nio.file.FileStore;
import java.util.Collection;
import java.util.List;

Expand All @@ -34,6 +35,13 @@ public interface StoreManager {
*/
boolean addBlobStore(ReplicaId replica);

/**
* Add a new FileStore with given {@link ReplicaId}.
* @param replicaId the {@link ReplicaId} of the {@link FileStore} which would be added.
* @return {@code true} if adding FileStore was successful. {@code false} if not.
*/
boolean addFileStore(ReplicaId replicaId);

/**
* Remove store from storage manager.
* @param id the {@link PartitionId} associated with store
Expand Down Expand Up @@ -62,6 +70,14 @@ public interface StoreManager {
*/
Store getStore(PartitionId id);

/**
*
* @param id the {@link PartitionId} to find the store for.
* @return the {@link FileStore} corresponding to the given {@link PartitionId}, or {@code null} if no store was found for
* that partition, or that store was not started.
*/
FileStore getFileStore(PartitionId id);

/**
* Get replicaId on current node by partition name. (There should be at most one replica belonging to specific
* partition on single node)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.github.ambry.server.ServerErrorCode;
import com.github.ambry.server.StoreManager;
import com.github.ambry.store.Store;
import java.nio.file.FileStore;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -57,6 +58,11 @@ public boolean addBlobStore(ReplicaId replica) {
return createAndStartBlobStoreIfAbsent(replica.getPartitionId()) != null;
}

@Override
public boolean addFileStore(ReplicaId replicaId) {
return false;
}

@Override
public boolean shutdownBlobStore(PartitionId id) {
try {
Expand Down Expand Up @@ -84,6 +90,11 @@ public Store getStore(PartitionId id) {
return createAndStartBlobStoreIfAbsent(id);
}

@Override
public FileStore getFileStore(PartitionId id) {
return null;
}

@Override
public boolean scheduleNextForCompaction(PartitionId id) {
throw new UnsupportedOperationException("Method not supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void initiateBootstrap(ReplicaId replicaId) {

@Override
public void initiateFileCopy(ReplicaId replicaId) {
//To BE Filled.
//To Be Added With File Copy Protocol
}

@Override
Expand Down Expand Up @@ -108,7 +108,7 @@ public void waitBootstrapCompleted(String partitionName) throws InterruptedExcep

@Override
public void waitForFileCopyCompleted(String partitionName) throws InterruptedException {
//To Be Filled
//To Be Added With File Copy Protocol
}

@Override
Expand Down Expand Up @@ -204,7 +204,7 @@ public void onBootstrapComplete(ReplicaId replicaId) {

@Override
public void onFileCopyComplete(ReplicaId replicaId) {
// To Be Filled
//To Be Added With File Copy Protocol
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/**
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.protocol;

import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.utils.Utils;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.charset.Charset;


public class FileCopyProtocolGetChunkRequest extends RequestOrResponse{
private PartitionId partitionId;
private String fileName;
private long startOffset;
private long sizeInBytes;
private static final short File_Chunk_Request_Version_V1 = 1;
private static final int File_Name_Size_In_Bytes = 4;


public FileCopyProtocolGetChunkRequest( short versionId, int correlationId,
String clientId, PartitionId partitionId, String fileName, long startOffset, long sizeInBytes) {
super(RequestOrResponseType.FileCopyProtocolGetChunkRequest, versionId, correlationId, clientId);
if(partitionId == null || fileName.isEmpty() || startOffset < 0 || sizeInBytes < 0){
throw new IllegalArgumentException("PartitionId, FileName, StartOffset and SizeInBytes cannot be null or negative");
}
this.partitionId = partitionId;
this.fileName = fileName;
this.startOffset = startOffset;
this.sizeInBytes = sizeInBytes;
}

public static FileCopyProtocolGetChunkRequest readFrom(DataInputStream stream, ClusterMap clusterMap)
throws IOException {
Short versionId = stream.readShort();
validateVersion(versionId);
int correlationId = stream.readInt();
String clientId = Utils.readIntString(stream);
PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream);
String fileName = Utils.readIntString(stream);
long startOffset = stream.readLong();
long sizeInBytes = stream.readLong();
return new FileCopyProtocolGetChunkRequest(versionId, correlationId, clientId, partitionId, fileName, startOffset, sizeInBytes);
}

protected void prepareBuffer(){
super.prepareBuffer();
bufferToSend.writeBytes(partitionId.getBytes());
Utils.serializeString(bufferToSend, fileName, Charset.defaultCharset());
bufferToSend.writeLong(startOffset);
bufferToSend.writeLong(sizeInBytes);
}

public String toString(){
StringBuilder sb = new StringBuilder();
sb.append("FileCopyProtocolGetChunkRequest[")
.append("PartitionId=").append(partitionId)
.append(", FileName=").append(fileName)
.append(", StartOffset=").append(startOffset)
.append(", SizeInBytes=").append(sizeInBytes)
.append("]");
return sb.toString();
}

public long sizeInBytes() {
return super.sizeInBytes() + partitionId.getBytes().length + File_Name_Size_In_Bytes + fileName.length() + Long.BYTES + Long.BYTES;
}

public PartitionId getPartitionId() { return partitionId; }
public String getFileName() { return fileName; }
public long getStartOffset() { return startOffset; }
public long getSizeInBytes() { return sizeInBytes; }

static void validateVersion(short version){
if (version != File_Chunk_Request_Version_V1) {
throw new IllegalArgumentException("Unknown version for FileMetadataRequest: " + version);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/**
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.protocol;

import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.server.ServerErrorCode;
import com.github.ambry.utils.Utils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.charset.Charset;


public class FileCopyProtocolGetChunkResponse extends Response{
private PartitionId partitionId;
private String fileName;
protected ByteBuf chunkData;
private final long startOffset;
private final long chunkSizeInBytes;
private final boolean isLastChunk;
private static final int File_Name_Field_Size_In_Bytes = 4;

public FileCopyProtocolGetChunkResponse(short versionId, int correlationId, String clientId, ServerErrorCode errorCode,
PartitionId partitionId, String fileName, ByteBuf chunkData, long startOffset, long chunkSizeInBytes, boolean isLastChunk) {
super(RequestOrResponseType.FileCopyProtocolGetChunkResponse, versionId, correlationId, clientId, errorCode);
this.partitionId = partitionId;
this.fileName = fileName;
this.chunkData = chunkData;
this.startOffset = startOffset;
this.chunkSizeInBytes = chunkSizeInBytes;
this.isLastChunk = isLastChunk;
}

public static FileCopyProtocolGetChunkResponse readFrom(DataInputStream stream, ClusterMap clusterMap) throws IOException {
RequestOrResponseType type = RequestOrResponseType.values()[stream.readShort()];
if (type != RequestOrResponseType.FileCopyProtocolGetChunkResponse) {
throw new IllegalArgumentException("The type of request response is not compatible");
}
short versionId = stream.readShort();
int correlationId = stream.readInt();
String clientId = Utils.readIntString(stream);
ServerErrorCode errorCode = ServerErrorCode.values()[stream.readShort()];

if(errorCode != ServerErrorCode.No_Error){
return new FileCopyProtocolGetChunkResponse(versionId, correlationId, clientId, errorCode,
null, null, null, -1, -1, false);
}

PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream);
String fileName = Utils.readIntString(stream);
long startOffset = stream.readLong();
long sizeInBytes = stream.readLong();
boolean isLastChunk = stream.readBoolean();
int chunkLengthInBytes = stream.readInt();
ByteBuf chunk = Unpooled.buffer(chunkLengthInBytes);
for(int i = 0; i < chunkLengthInBytes; i++){
chunk.writeByte(stream.readByte());
}
return new FileCopyProtocolGetChunkResponse(versionId, correlationId, clientId, errorCode, partitionId, fileName, chunk, startOffset, sizeInBytes, isLastChunk);
}
public long sizeInBytes() {
return super.sizeInBytes() + partitionId.getBytes().length + File_Name_Field_Size_In_Bytes + fileName.length() + Long.BYTES + Long.BYTES + 1 + Integer.BYTES + chunkData.readableBytes();
}
public void prepareBuffer(){
super.prepareBuffer();
bufferToSend.writeBytes(partitionId.getBytes());
Utils.serializeString(bufferToSend, fileName, Charset.defaultCharset());
bufferToSend.writeLong(startOffset);
bufferToSend.writeLong(chunkSizeInBytes);
bufferToSend.writeBoolean(isLastChunk);
bufferToSend.writeInt(chunkData.readableBytes());
bufferToSend.writeBytes(chunkData);
}
public PartitionId getPartitionId() { return partitionId; }
public String getFileName() { return fileName; }
public ByteBuf getChunk() { return chunkData; }
public long getStartOffset() { return startOffset; }
public long getChunkSizeInBytes() { return chunkSizeInBytes; }
public boolean isLastChunk() { return isLastChunk; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.protocol;

import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.utils.Utils;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.charset.Charset;

public class FileCopyProtocolGetMetaDataRequest extends RequestOrResponse{
private PartitionId partitionId;
private String hostName;
private static final short File_Metadata_Request_Version_V1 = 1;
private static final int HostName_Field_Size_In_Bytes = 4;

public FileCopyProtocolGetMetaDataRequest(short versionId, int correlationId, String clientId,
PartitionId partitionId, String hostName) {
super(RequestOrResponseType.FileCopyProtocolGetMetaDataRequest, versionId, correlationId, clientId);
if (partitionId == null || hostName.isEmpty()) {
throw new IllegalArgumentException("Partition and Host Name cannot be null");
}
this.partitionId = partitionId;
this.hostName = hostName;
}

public String getHostName() {
return hostName;
}

public PartitionId getPartitionId() {
return partitionId;
}

protected static FileCopyProtocolGetMetaDataRequest readFrom(DataInputStream stream, ClusterMap clusterMap) throws IOException {
Short versionId = stream.readShort();
validateVersion(versionId);
int correlationId = stream.readInt();
String clientId = Utils.readIntString(stream);
String hostName = Utils.readIntString(stream);
PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream);
return new FileCopyProtocolGetMetaDataRequest(versionId, correlationId, clientId, partitionId, hostName);
}

public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("FileMetaDataRequest[").append("PartitionId=").append(partitionId).append(", HostName=").append(hostName)
.append("]");
return sb.toString();
}

public long sizeInBytes() {
return super.sizeInBytes() + HostName_Field_Size_In_Bytes + hostName.length() + partitionId.getBytes().length;
}

protected void prepareBuffer() {
super.prepareBuffer();
Utils.serializeString(bufferToSend, hostName, Charset.defaultCharset());
bufferToSend.writeBytes(partitionId.getBytes());
}

static void validateVersion(short version) {
if (version != File_Metadata_Request_Version_V1) {
throw new IllegalArgumentException("Unknown version for FileMetadataRequest: " + version);
}
}
}
Loading
Loading