Skip to content

Commit

Permalink
Merge pull request #57 from juhoautio-rovio/hdfs_storage_options
Browse files Browse the repository at this point in the history
More config options for HDFS storage (e.g. Kerberos)
  • Loading branch information
juhoautio-rovio authored Sep 11, 2024
2 parents b41cc3b + 0124cd2 commit 31153c7
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 19 deletions.
15 changes: 11 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,14 +314,22 @@ These are the options for `DruidSource`, to be passed with `write.options()`.

| Property | Description |
| --- |--- |
| `druid.segment_storage.s3.bucket` | S3 bucket name for the Deep Storage | |
| `druid.segment_storage.s3.basekey` | S3 key prefix for the Deep Storage. No trailing slashes. | |
| `druid.segment_storage.s3.bucket` | S3 bucket name for the Deep Storage |
| `druid.segment_storage.s3.basekey` | S3 key prefix for the Deep Storage. No trailing slashes. |

2. **If Deep Storage is `local`:**

| Property | Description |
| --- |--- |
| `druid.segment_storage.local.dir` | For local Deep Storage, absolute path to segment directory | |
| `druid.segment_storage.local.dir` | For local Deep Storage, absolute path to segment directory |

3. **If Deep Storage is `hdfs`:**

| Property | Description |
| --- | --- |
| `druid.segment_storage.hdfs.dir` | For hdfs Deep Storage, absolute path to segment directory |
| `druid.segment_storage.hdfs.security.kerberos.principal` | Kerberos principal |
| `druid.segment_storage.hdfs.security.kerberos.keytab` | Kerberos keytab |

#### Optional properties

Expand All @@ -336,7 +344,6 @@ These are the options for `DruidSource`, to be passed with `write.options()`.
| `druid.memory.max_rows` | Max number of rows to keep in memory in spark data writer | `75000` |
| `druid.segment_storage.type` | Type of Deep Storage to use. Allowed values: `s3`, `local`, `hdfs`. | `s3` |
| `druid.segment_storage.s3.disableacl` | Whether to disable ACL in S3 config. | `false` |
| `druid.segment_storage.hdfs.dir` | Hdfs segment storage location | `""` |
| `druid.datasource.init` | Boolean flag for (re-)initializing Druid datasource. If `true`, any pre-existing segments for the datasource is marked as unused. | `false` |
| `druid.bitmap_factory` | Compression format for bitmap indexes. Possible values: `concise`, `roaring`. For type `roaring`, the boolean property compressRunOnSerialization is always set to `true`. `rovio-ingest` uses `concise` by default regardless of Druid library version. | `concise` |
| `druid.segment.rollup` | Whether to rollup data during ingestion | `true` |
Expand Down
49 changes: 40 additions & 9 deletions src/main/java/com/rovio/ingest/WriterContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ public class WriterContext implements Serializable {
private final String s3BaseKey;
private final boolean s3DisableAcl;
private final String localDir;
private final String hdfsDir;
private final String hdfsCoreSitePath;
private final String hdfsHdfsSitePath;
private final String hdfsDefaultFS;
private final String hdfsSecurityKerberosPrincipal;
private final String hdfsSecurityKerberosKeytab;
private final String deepStorageType;
private final boolean initDataSource;
private final String version;
Expand All @@ -64,7 +70,6 @@ public class WriterContext implements Serializable {
private final String dimensionsSpec;
private final String metricsSpec;
private final String transformSpec;
private String getHdfsStorageDir;

private WriterContext(CaseInsensitiveStringMap options, String version) {
this.dataSource = getOrThrow(options, ConfKeys.DATA_SOURCE);
Expand Down Expand Up @@ -97,7 +102,12 @@ private WriterContext(CaseInsensitiveStringMap options, String version) {
this.s3BaseKey = options.getOrDefault(ConfKeys.DEEP_STORAGE_S3_BASE_KEY, null);
this.s3DisableAcl = options.getBoolean(ConfKeys.DEEP_STORAGE_S3_DISABLE_ACL, false);
this.localDir = options.getOrDefault(ConfKeys.DEEP_STORAGE_LOCAL_DIRECTORY, null);
this.getHdfsStorageDir = options.getOrDefault(ConfKeys.DEEP_STORAGE_HDFS_DIRECTORY, null);
this.hdfsDir = options.getOrDefault(ConfKeys.DEEP_STORAGE_HDFS_STORAGE_DIRECTORY, null);
this.hdfsCoreSitePath = options.getOrDefault(ConfKeys.DEEP_STORAGE_HDFS_CORE_SITE_PATH, null);
this.hdfsHdfsSitePath = options.getOrDefault(ConfKeys.DEEP_STORAGE_HDFS_HDFS_SITE_PATH, null);
this.hdfsDefaultFS = options.getOrDefault(ConfKeys.DEEP_STORAGE_HDFS_DEFAULT_FS, null);
this.hdfsSecurityKerberosPrincipal = options.getOrDefault(ConfKeys.DEEP_STORAGE_HDFS_SECURITY_KERBEROS_PRINCIPAL, null);
this.hdfsSecurityKerberosKeytab = options.getOrDefault(ConfKeys.DEEP_STORAGE_HDFS_SECURITY_KERBEROS_KEYTAB, null);

this.deepStorageType = options.getOrDefault(ConfKeys.DEEP_STORAGE_TYPE, DEFAULT_DRUID_DEEP_STORAGE_TYPE);
Preconditions.checkArgument(Arrays.asList("s3", "local", "hdfs").contains(this.deepStorageType),
Expand Down Expand Up @@ -194,8 +204,28 @@ public String getLocalDir() {
return localDir;
}

public String getDeepStorageType() {
return deepStorageType;
public String getHdfsDir() {
return hdfsDir;
}

public String getHdfsCoreSitePath() {
return hdfsCoreSitePath;
}

public String getHdfsHdfsSitePath() {
return hdfsHdfsSitePath;
}

public String getHdfsDefaultFS() {
return hdfsDefaultFS;
}

public String getHdfsSecurityKerberosPrincipal() {
return hdfsSecurityKerberosPrincipal;
}

public String getHdfsSecurityKerberosKeytab() {
return hdfsSecurityKerberosKeytab;
}

public boolean isInitDataSource() {
Expand Down Expand Up @@ -238,10 +268,6 @@ public String getTransformSpec() {
return transformSpec;
}

public String getHdfsStorageDir() {
return getHdfsStorageDir;
}

public static class ConfKeys {
public static final String DATASOURCE_INIT = "druid.datasource.init";
// Segment config
Expand Down Expand Up @@ -274,6 +300,11 @@ public static class ConfKeys {
// Local config (only for testing)
public static final String DEEP_STORAGE_LOCAL_DIRECTORY = "druid.segment_storage.local.dir";
// HDFS config
public static final String DEEP_STORAGE_HDFS_DIRECTORY = "druid.segment_storage.hdfs.dir";
public static final String DEEP_STORAGE_HDFS_STORAGE_DIRECTORY = "druid.segment_storage.hdfs.dir";
public static final String DEEP_STORAGE_HDFS_CORE_SITE_PATH = "druid.segment_storage.hdfs.core.site.path";
public static final String DEEP_STORAGE_HDFS_HDFS_SITE_PATH = "druid.segment_storage.hdfs.hdfs.site.path";
public static final String DEEP_STORAGE_HDFS_DEFAULT_FS = "druid.segment_storage.hdfs.default.fs";
public static final String DEEP_STORAGE_HDFS_SECURITY_KERBEROS_PRINCIPAL = "druid.segment_storage.hdfs.security.kerberos.principal";
public static final String DEEP_STORAGE_HDFS_SECURITY_KERBEROS_KEYTAB = "druid.segment_storage.hdfs.security.kerberos.keytab";
}
}
52 changes: 46 additions & 6 deletions src/main/java/com/rovio/ingest/util/SegmentStorageUpdater.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.druid.storage.hdfs.HdfsDataSegmentKiller;
import org.apache.druid.storage.hdfs.HdfsDataSegmentPusher;
import org.apache.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
import org.apache.druid.storage.hdfs.HdfsKerberosConfig;
import org.apache.druid.storage.hdfs.HdfsStorageAuthentication;
import org.apache.druid.storage.s3.NoopServerSideEncryption;
import org.apache.druid.storage.s3.S3DataSegmentKiller;
import org.apache.druid.storage.s3.S3DataSegmentPusher;
Expand All @@ -47,7 +49,16 @@ public static DataSegmentPusher createPusher(WriterContext param) {
if (param.isLocalDeepStorage()) {
return new LocalDataSegmentPusher(getLocalConfig(param.getLocalDir()));
} else if (param.isHdfsDeepStorage()) {
return new HdfsDataSegmentPusher(getHdfsConfig(param.getHdfsStorageDir()), new Configuration(), MAPPER);
return new HdfsDataSegmentPusher(
getHdfsConfig(
param.getHdfsDir(),
param.getHdfsSecurityKerberosPrincipal(),
param.getHdfsSecurityKerberosKeytab(),
getHdfsHadoopConfiguration(param.getHdfsCoreSitePath(), param.getHdfsHdfsSitePath(), param.getHdfsDefaultFS())
),
getHdfsHadoopConfiguration(param.getHdfsCoreSitePath(), param.getHdfsHdfsSitePath(), param.getHdfsDefaultFS()),
MAPPER
);
} else {
ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3 = getAmazonS3().get();
S3DataSegmentPusherConfig s3Config = new S3DataSegmentPusherConfig();
Expand All @@ -64,7 +75,15 @@ public static DataSegmentKiller createKiller(WriterContext param) {
if (param.isLocalDeepStorage()) {
return new LocalDataSegmentKiller(getLocalConfig(param.getLocalDir()));
} else if (param.isHdfsDeepStorage()) {
return new HdfsDataSegmentKiller(new Configuration(), getHdfsConfig(param.getHdfsStorageDir()));
return new HdfsDataSegmentKiller(
getHdfsHadoopConfiguration(param.getHdfsCoreSitePath(), param.getHdfsHdfsSitePath(), param.getHdfsDefaultFS()),
getHdfsConfig(
param.getHdfsDir(),
param.getHdfsSecurityKerberosPrincipal(),
param.getHdfsSecurityKerberosKeytab(),
getHdfsHadoopConfiguration(param.getHdfsCoreSitePath(), param.getHdfsHdfsSitePath(), param.getHdfsDefaultFS())
)
);
} else {
Supplier<ServerSideEncryptingAmazonS3> serverSideEncryptingAmazonS3 = getAmazonS3();
S3DataSegmentPusherConfig s3Config = new S3DataSegmentPusherConfig();
Expand Down Expand Up @@ -92,11 +111,32 @@ private static LocalDataSegmentPusherConfig getLocalConfig(String localDir) {
}).get();
}

private static HdfsDataSegmentPusherConfig getHdfsConfig(String hdfsStorageDir) {
private static HdfsDataSegmentPusherConfig getHdfsConfig(String hdfsDir, String kerberosPrincipal, String kerberosKeytab, Configuration conf) {
return Suppliers.memoize(() -> {
HdfsDataSegmentPusherConfig hdfsSegmentPusherConfig = new HdfsDataSegmentPusherConfig();
hdfsSegmentPusherConfig.setStorageDirectory(hdfsStorageDir);
return hdfsSegmentPusherConfig;
HdfsDataSegmentPusherConfig config = new HdfsDataSegmentPusherConfig();
if (hdfsDir != null) {
config.setStorageDirectory(hdfsDir);
}
if (kerberosPrincipal != null && kerberosKeytab != null) {
HdfsKerberosConfig hdfsKerberosConfig = new HdfsKerberosConfig(kerberosPrincipal, kerberosKeytab);
HdfsStorageAuthentication hdfsAuth = new HdfsStorageAuthentication(hdfsKerberosConfig, conf);
hdfsAuth.authenticate();
}
return config;
}).get();
}

private static Configuration getHdfsHadoopConfiguration(String hdfsCoreSitePath, String hdfsHdfsSitePath, String defaultFS) {
return Suppliers.memoize(() -> {
if (hdfsCoreSitePath == null || hdfsHdfsSitePath == null) {
throw new UnsupportedOperationException("Custom hdfs site configuration is not implemented");
} else {
Configuration configuration = new Configuration(true);
if (defaultFS != null) {
configuration.set("fs.defaultFS", defaultFS);
}
return configuration;
}
}).get();
}
}

0 comments on commit 31153c7

Please sign in to comment.