Skip to content

Commit

Permalink
Encryption sample (#16)
Browse files Browse the repository at this point in the history
Fixes #5
  • Loading branch information
cretz authored Sep 7, 2022
1 parent 7f2f9b7 commit ac44173
Show file tree
Hide file tree
Showing 8 changed files with 1,042 additions and 68 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ Prerequisites:

With this repository cloned, run the following at the root of the directory:

poetry install --no-root
poetry install

That loads all dependencies. Then to run a sample, usually you just run it in Python. For example:
That loads all required dependencies. Then to run a sample, usually you just run it in Python. For example:

poetry run python hello/hello_activity.py

See each sample's directory for specific instructions.
Some examples require extra dependencies. See each sample's directory for specific instructions.

## Samples

Expand Down Expand Up @@ -53,3 +53,4 @@ See each sample's directory for specific instructions.
while running.
* [hello_signal](hello/hello_signal.py) - Send signals to a workflow.
* [activity_worker](activity_worker) - Use Python activities from a workflow in another language.
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
54 changes: 54 additions & 0 deletions encryption/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Encryption Sample

This sample shows how to make an encryption codec for end-to-end encryption. It is built to work with the encryption
samples [in TypeScript](https://github.com/temporalio/samples-typescript/tree/main/encryption) and
[in Go](https://github.com/temporalio/samples-go/tree/main/encryption).

For this sample, the optional `encryption` dependency group must be included. To include, run:

poetry install --with encryption

To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the
worker:

poetry run python worker.py

This will start the worker. Then, in another terminal, run the following to execute the workflow:

poetry run python starter.py

The workflow should complete with the hello result. To view the workflow, use [tctl](https://docs.temporal.io/tctl/):

tctl workflow show --workflow_id encryption-workflow-id

Note how the input/result look like (with wrapping removed):

```
Input:[encoding binary/encrypted: payload encoding is not supported]
...
Result:[encoding binary/encrypted: payload encoding is not supported]
```

This is because the data is encrypted and not visible. To make data visible to external Temporal tools like `tctl` and
the UI, start a codec server in another terminal:

poetry run python codec_server.py

Now with that running, run `tctl` again with the codec endpoint:

tctl --codec_endpoint http://localhost:8081 workflow show --workflow_id encryption-workflow-id

Notice now the output has the unencrypted values:

```
Input:["Temporal"]
...
Result:["Hello, Temporal"]
```

This decryption did not leave the local machine here.

Same case with the web UI. If you go to the web UI, you'll only see encrypted input/results. But, assuming your web UI
is at `http://localhost:8080`, if you set the "Remote Codec Endpoint" in the web UI to `http://localhost:8081` you can
then see the unencrypted results. This is possible because CORS settings in the codec server allow the browser to access
the codec server directly over localhost. They can be changed to suit Temporal cloud web UI instead if necessary.
56 changes: 56 additions & 0 deletions encryption/codec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import base64
import os
from typing import Iterable, List

from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from temporalio.api.common.v1 import Payload
from temporalio.converter import PayloadCodec

default_key = base64.b64decode(b"MkUb3RVdHQuOTedqETZW7ra2GkZqpBRmYWRACUospMc=")
default_key_id = "my-key"


class EncryptionCodec(PayloadCodec):
def __init__(self, key_id: str = default_key_id, key: bytes = default_key) -> None:
super().__init__()
self.key_id = key_id
# We are using direct AESGCM to be compatible with samples from
# TypeScript and Go. Pure Python samples may prefer the higher-level,
# safer APIs.
self.encryptor = AESGCM(key)

async def encode(self, payloads: Iterable[Payload]) -> List[Payload]:
# We blindly encode all payloads with the key and set the metadata
# saying which key we used
return [
Payload(
metadata={
"encoding": b"binary/encrypted",
"encryption-key-id": self.key_id.encode(),
},
data=self.encrypt(p.SerializeToString()),
)
for p in payloads
]

async def decode(self, payloads: Iterable[Payload]) -> List[Payload]:
ret: List[Payload] = []
for p in payloads:
# Ignore ones w/out our expected encoding
if p.metadata.get("encoding", b"").decode() != "binary/encrypted":
ret.append(p)
continue
# Confirm our key ID is the same
key_id = p.metadata.get("encryption-key-id", b"").decode()
if key_id != self.key_id:
raise ValueError(f"Unrecognized key ID {key_id}")
# Decrypt and append
ret.append(Payload.FromString(self.decrypt(p.data)))
return ret

def encrypt(self, data: bytes) -> bytes:
nonce = os.urandom(12)
return nonce + self.encryptor.encrypt(nonce, data, None)

def decrypt(self, data: bytes) -> bytes:
return self.encryptor.decrypt(data[:12], data[12:], None)
52 changes: 52 additions & 0 deletions encryption/codec_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from functools import partial
from typing import Awaitable, Callable, Iterable, List

from aiohttp import hdrs, web
from google.protobuf import json_format
from temporalio.api.common.v1 import Payload, Payloads

from encryption.codec import EncryptionCodec


def build_codec_server() -> web.Application:
# Cors handler
async def cors_options(req: web.Request) -> web.Response:
resp = web.Response()
if req.headers.get(hdrs.ORIGIN) == "http://localhost:8080":
resp.headers[hdrs.ACCESS_CONTROL_ALLOW_ORIGIN] = "http://localhost:8080"
resp.headers[hdrs.ACCESS_CONTROL_ALLOW_METHODS] = "POST"
resp.headers[hdrs.ACCESS_CONTROL_ALLOW_HEADERS] = "content-type,x-namespace"
return resp

# General purpose payloads-to-payloads
async def apply(
fn: Callable[[Iterable[Payload]], Awaitable[List[Payload]]], req: web.Request
) -> web.Response:
# Read payloads as JSON
assert req.content_type == "application/json"
payloads = json_format.Parse(await req.read(), Payloads())

# Apply
payloads = Payloads(payloads=await fn(payloads.payloads))

# Apply CORS and return JSON
resp = await cors_options(req)
resp.content_type = "application/json"
resp.text = json_format.MessageToJson(payloads)
return resp

# Build app
codec = EncryptionCodec()
app = web.Application()
app.add_routes(
[
web.post("/encode", partial(apply, codec.encode)),
web.post("/decode", partial(apply, codec.decode)),
web.options("/decode", cors_options),
]
)
return app


if __name__ == "__main__":
web.run_app(build_codec_server(), host="127.0.0.1", port=8081)
32 changes: 32 additions & 0 deletions encryption/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import asyncio
import dataclasses

import temporalio.converter
from temporalio.client import Client

from encryption.codec import EncryptionCodec
from encryption.worker import GreetingWorkflow


async def main():
# Connect client
client = await Client.connect(
"localhost:7233",
# Use the default converter, but change the codec
data_converter=dataclasses.replace(
temporalio.converter.default(), payload_codec=EncryptionCodec()
),
)

# Run workflow
result = await client.execute_workflow(
GreetingWorkflow.run,
"Temporal",
id=f"encryption-workflow-id",
task_queue="encryption-task-queue",
)
print(f"Workflow result: {result}")


if __name__ == "__main__":
asyncio.run(main())
50 changes: 50 additions & 0 deletions encryption/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import asyncio
import dataclasses

import temporalio.converter
from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker

from encryption.codec import EncryptionCodec


@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return f"Hello, {name}"


interrupt_event = asyncio.Event()


async def main():
# Connect client
client = await Client.connect(
"localhost:7233",
# Use the default converter, but change the codec
data_converter=dataclasses.replace(
temporalio.converter.default(), payload_codec=EncryptionCodec()
),
)

# Run a worker for the workflow
async with Worker(
client,
task_queue="encryption-task-queue",
workflows=[GreetingWorkflow],
):
# Wait until interrupted
print("Worker started, ctrl+c to exit")
await interrupt_event.wait()
print("Shutting down")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
Loading

0 comments on commit ac44173

Please sign in to comment.