Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADR] Readers / Writers Multithreading #1613

Merged
merged 4 commits into from
Sep 23, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions docs/arch/adr-0003-rw-multithreading.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# 0003 - Readers / Writers Multithreading

## Context

Not all GeoTrellis readers and writers implemented using MR jobs (Accumulo RDDReader, Hadoop RDDReaders), but using socket reads as well. This (socket) this approach allows to define paralelizm level depending on system configuration, like CPU, RAM, FS. In case of `RDDReaders`, that would be threads amount per rdd partition, in case of `CollectionReaders`, that would be threads amount per whole collection.

All numbers are more impericall rather than have strong theory approvals. Test cluster works in a local network to exclude possible network issues. Reads tested on ~900 objects per read request of landsat tiles ([test project](https://github.com/geotrellis/geotrellis-landsat-emr-demo)).

**Test cluster**:

* Apache Spark 1.6.2
* Apache Hadoop 2.7.2
* Apache Accumulo 1.7.1
* Cassandra 3.7

## Decision

Was benchamrked functions calls performace depending on RAM / and CPU cores availble.

### File Backend

`FileCollectionReader` optimal (or reasonable in most cases) pool size equal to cores number. As well there could be FS restrictions, that depends on a certain FS settings.

* _collection.reader: number of CPU cores available to the virtual machine_
* _rdd.reader / writer: number of CPU cores available to the virtual machine_

### Hadoop Backend

In case of `Hadoop` we can use up to 16 threads without reall significant memory usage increment, as `HadoopCollectionReader` keeps in cache up to 16 `MapFile.Readers` by default (by design). However using more than 16 threads would not improve performance signifiicantly.

* _collection.reader: number of CPU cores available to the virtual machine_

### S3 Backend

`S3` threads number is limited only by the backpressure, and that's an impericall number to have max performance and not to have lots of useless failed requests.

* _collection.reader: number of CPU cores available to the virtual machine, <= 8_
* _rdd.reader / writer: number of CPU cores available to the virtual machine, <= 8_

### Accumulo Backend

Numbers in the table provided are average for warmup calls. Same results valid for all backends supported, and the main really performance valueable configuration property is avaible CPU cores, results table:

*4 CPU cores result (m3.xlarge):*

Threads | Reads time (ms) | Comment
-------- | ----------------|--------
4 | ~15,541 | -
8 | ~18,541 | ~500mb+ of ram usage to previous
32 | ~20,120 | ~500mb+ of ram usage to previous

*8 CPU cores result (m3.2xlarge):*

Threads | Reads time (ms) | Comment
-------- | ----------------|--------
4 | ~12,532 | -
8 | ~9,541 | ~500mb+ of ram usage to previous
32 | ~10,610 | ~500mb+ of ram usage to previous

* _collection.reader: number of CPU cores available to the virtual machine_

### Cassandra Backend

*4 CPU cores result (m3.xlarge):*

Threads | Reads time (ms) | Comment
-------- | ----------------|--------
4 | ~7,622 | -
8 | ~9,511 | Higher load on a driver node + (+ ~500mb of ram usage to previous)
32 | ~13,261 | Higher load on a driver node + (+ ~500mb of ram usage to previous)

*8 CPU cores result (m3.2xlarge):*

Threads | Reads time (ms) | Comment
-------- | ----------------|--------
4 | ~8,100 | -
8 | ~4,541 | Higher load on a driver node + (+ ~500mb of ram usage to previous)
32 | ~7,610 | Higher load on a driver node + (+ ~500mb of ram usage to previous)

* _collection.reader: number of CPU cores available to the virtual machine_
* _rdd.reader / writer: number of CPU cores available to the virtual machine_

## Conclusion

For all backends performance result are pretty similar to `Accumulo` and `Cassandra` backend numbers. In order not to duplicate data these numbers were omitted. Thread pool size mostly depend on CPU cores availble, less on RAM. In order not to loose performane should not be used threads more than CPU cores availble for java machine, otherwise that can lead to significant performance loss.
40 changes: 17 additions & 23 deletions docs/spark/spark-io.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ The idea is similar to the `LayerReader.reader` method except in this case we're
## Readers threads

Cassandra and S3 Layer RDDReaders / RDDWriters are configurable by threads amount. It's a programm setting, that can be different for a certain machine (depends on resources available). Configuration could be set in the `reference.conf` / `application.conf` file of your app, default settings available in a `reference.conf` file of each backend subproject (we use [TypeSafe Config](https://github.com/typesafehub/config)).
For a File backend only RDDReader is configurable, For Accumulo - only RDDWriter (Socket Strategy). For all backends CollectionReaders are configurable as well.
For a File backend only RDDReader is configurable, For Accumulo - only RDDWriter (Socket Strategy). For all backends CollectionReaders are configurable as well. By default thread pool size per each configurable reader / writer equals by virtual machine cpu cores available. Word `default` means thread per cpu core, it can be changed to any integer value.

Configuration example (defaut means to use all processors available to the Java virtual machine):
Default configuration example:

```conf
geotrellis.accumulo.threads {
Expand All @@ -217,6 +217,9 @@ geotrellis.file.threads {
collection.read = default
rdd.read = default
}
geotrellis.hadoop.threads {
collection.read = default
}
geotrellis.cassandra.threads {
collection.read = default
rdd {
Expand All @@ -233,27 +236,18 @@ geotrellis.s3.threads {
}
```

Cassandra has additional configuration settings:

And additional connections parameters for`Cassandra`:
```conf
geotrellis.accumulo.threads {
collection.read = 32
rdd.write = 32
}
geotrellis.file.threads {
collection.read = 32
rdd.read = 32
}
geotrellis.cassandra.threads {
collection.read = 32
rdd {
write = 32
read = 32
}
}
geotrellis.s3.threads {
collection.read = 32
rdd {
write = 32
read = 32
}
geotrellis.cassandra {
keyspace = "geotrellis"
replicationStrategy = "SimpleStrategy"
replicationFactor = 1
localDc = "datacenter1"
usedHostsPerRemoteDc = 0
allowRemoteDCsForLocalConsistencyLevel = false
}
```

Consider using `hbase.client.scanner.caching` parameter for `HBase` as it may increase scan performance.