Skip to content

Commit

Permalink
HDFS-17509. RBF: Fix ClientProtocol.concat will throw NPE if tgr is a…
Browse files Browse the repository at this point in the history
… empty file. (#6784)
  • Loading branch information
LiuGuH committed May 17, 2024
1 parent 3c00093 commit 8f92cda
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
Expand Down Expand Up @@ -667,39 +668,28 @@ public void rename2(final String src, final String dst,
public void concat(String trg, String[] src) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.WRITE);

// See if the src and target files are all in the same namespace
LocatedBlocks targetBlocks = getBlockLocations(trg, 0, 1);
if (targetBlocks == null) {
throw new IOException("Cannot locate blocks for target file - " + trg);
}
LocatedBlock lastLocatedBlock = targetBlocks.getLastLocatedBlock();
String targetBlockPoolId = lastLocatedBlock.getBlock().getBlockPoolId();
for (String source : src) {
LocatedBlocks sourceBlocks = getBlockLocations(source, 0, 1);
if (sourceBlocks == null) {
throw new IOException(
"Cannot located blocks for source file " + source);
}
String sourceBlockPoolId =
sourceBlocks.getLastLocatedBlock().getBlock().getBlockPoolId();
if (!sourceBlockPoolId.equals(targetBlockPoolId)) {
throw new IOException("Cannot concatenate source file " + source
+ " because it is located in a different namespace"
+ " with block pool id " + sourceBlockPoolId
+ " from the target file with block pool id "
+ targetBlockPoolId);
}
// Concat only effects when all files in the same namespace.
RemoteLocation targetDestination = getFileRemoteLocation(trg);
if (targetDestination == null) {
throw new IOException("Cannot find target file - " + trg);
}
String targetNameService = targetDestination.getNameserviceId();

// Find locations in the matching namespace.
final RemoteLocation targetDestination =
rpcServer.getLocationForPath(trg, true, targetBlockPoolId);
String[] sourceDestinations = new String[src.length];
for (int i = 0; i < src.length; i++) {
String sourceFile = src[i];
RemoteLocation location =
rpcServer.getLocationForPath(sourceFile, true, targetBlockPoolId);
sourceDestinations[i] = location.getDest();
RemoteLocation srcLocation = getFileRemoteLocation(sourceFile);
if (srcLocation == null) {
throw new IOException("Cannot find source file - " + sourceFile);
}
sourceDestinations[i] = srcLocation.getDest();

if (!targetNameService.equals(srcLocation.getNameserviceId())) {
throw new IOException("Cannot concatenate source file " + sourceFile
+ " because it is located in a different namespace" + " with nameservice "
+ srcLocation.getNameserviceId() + " from the target file with nameservice "
+ targetNameService);
}
}
// Invoke
RemoteMethod method = new RemoteMethod("concat",
Expand Down Expand Up @@ -1009,6 +999,28 @@ public HdfsFileStatus getFileInfo(String src) throws IOException {
return ret;
}

public RemoteLocation getFileRemoteLocation(String path) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);

final List<RemoteLocation> locations = rpcServer.getLocationsForPath(path, false, false);
if (locations.size() == 1) {
return locations.get(0);
}
RemoteLocation remoteLocation = null;
for (RemoteLocation location : locations) {
RemoteMethod method =
new RemoteMethod("getFileInfo", new Class<?>[] {String.class}, new RemoteParam());
HdfsFileStatus ret = rpcClient.invokeSequential(Collections.singletonList(location), method,
HdfsFileStatus.class, null);
if (ret != null) {
remoteLocation = location;
break;
}
}

return remoteLocation;
}

@Override
public boolean isFileClosed(String src) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1697,42 +1697,6 @@ public Long getNextSPSPath() throws IOException {
return nnProto.getNextSPSPath();
}

/**
* Locate the location with the matching block pool id.
*
* @param path Path to check.
* @param failIfLocked Fail the request if locked (top mount point).
* @param blockPoolId The block pool ID of the namespace to search for.
* @return Prioritized list of locations in the federated cluster.
* @throws IOException if the location for this path cannot be determined.
*/
protected RemoteLocation getLocationForPath(
String path, boolean failIfLocked, String blockPoolId)
throws IOException {

final List<RemoteLocation> locations =
getLocationsForPath(path, failIfLocked);

String nameserviceId = null;
Set<FederationNamespaceInfo> namespaces =
this.namenodeResolver.getNamespaces();
for (FederationNamespaceInfo namespace : namespaces) {
if (namespace.getBlockPoolId().equals(blockPoolId)) {
nameserviceId = namespace.getNameserviceId();
break;
}
}
if (nameserviceId != null) {
for (RemoteLocation location : locations) {
if (location.getNameserviceId().equals(nameserviceId)) {
return location;
}
}
}
throw new IOException(
"Cannot locate a nameservice for block pool " + blockPoolId);
}

/**
* Get the possible locations of a path in the federated cluster.
* During the get operation, it will do the quota verification.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,21 @@ public void testProxyGetPreferedBlockSize() throws Exception {
routerProtocol, nnProtocol, m, new Object[] {badPath});
}

private void testConcat(
String source, String target, boolean failureExpected, boolean verfiyException, String msg) {
boolean failure = false;
try {
// Concat test file with fill block length file via router
routerProtocol.concat(target, new String[] {source});
} catch (IOException ex) {
failure = true;
if (verfiyException) {
assertExceptionContains(msg, ex);
}
}
assertEquals(failureExpected, failure);
}

private void testConcat(
String source, String target, boolean failureExpected) {
boolean failure = false;
Expand Down Expand Up @@ -1224,6 +1239,27 @@ public void testProxyConcatFile() throws Exception {
String badPath = "/unknownlocation/unknowndir";
compareResponses(routerProtocol, nnProtocol, m,
new Object[] {badPath, new String[] {routerFile}});

// Test when concat trg is a empty file
createFile(routerFS, existingFile, existingFileSize);
String sameRouterEmptyFile =
cluster.getFederatedTestDirectoryForNS(sameNameservice) +
"_newemptyfile";
createFile(routerFS, sameRouterEmptyFile, 0);
// Concat in same namespaces, succeeds
testConcat(existingFile, sameRouterEmptyFile, false);
FileStatus mergedStatus = getFileStatus(routerFS, sameRouterEmptyFile);
assertEquals(existingFileSize, mergedStatus.getLen());

// Test when concat srclist has some empty file, namenode will throw IOException.
String srcEmptyFile = cluster.getFederatedTestDirectoryForNS(sameNameservice) + "_srcEmptyFile";
createFile(routerFS, srcEmptyFile, 0);
String targetFile = cluster.getFederatedTestDirectoryForNS(sameNameservice) + "_targetFile";
createFile(routerFS, targetFile, existingFileSize);
// Concat in same namespaces, succeeds
testConcat(srcEmptyFile, targetFile, true, true,
"org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.HadoopIllegalArgumentException): concat: source file "
+ srcEmptyFile + " is invalid or empty or underConstruction");
}

@Test
Expand Down

0 comments on commit 8f92cda

Please sign in to comment.