Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds LangChain translation example #100

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [dsl](dsl) - DSL workflow that executes steps defined in a YAML file.
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
* [gevent_async](gevent_async) - Combine gevent and Temporal.
* [langchain](langchain) - Orchestrate workflows for LangChain.
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
Expand Down
28 changes: 28 additions & 0 deletions langchain/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# LangChain Sample

This sample shows you how you can use Temporal to orchestrate workflows for [LangChain](https://www.langchain.com).

For this sample, the optional `langchain` dependency group must be included. To include, run:

poetry install --with langchain

Export your [OpenAI API key](https://platform.openai.com/api-keys) as an environment variable. Replace `YOUR_API_KEY` with your actual OpenAI API key.

export OPENAI_API_KEY='...'

To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the
worker:

poetry run python worker.py

This will start the worker. Then, in another terminal, run the following to execute a workflow:

poetry run python starter.py

Then, in another terminal, run the following command to translate a phrase:

curl -X POST "http://localhost:8000/translate?phrase=hello%20world&language=Spanish"

Which should produce some output like:

{"translation":"Hola mundo"}
29 changes: 29 additions & 0 deletions langchain/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from dataclasses import dataclass

from temporalio import activity

from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate


@dataclass
class TranslateParams:
phrase: str
language: str


@activity.defn
async def translate_phrase(params: TranslateParams) -> dict:
# LangChain setup
template = """You are a helpful assistant who translates between languages.
Translate the following phrase into the specified language: {phrase}
Language: {language}"""
chat_prompt = ChatPromptTemplate.from_messages(
[
("system", template),
("human", "Translate"),
]
)
chain = chat_prompt | ChatOpenAI()
# Use the asynchronous invoke method
return await chain.ainvoke({"phrase": params.phrase, "language": params.language})
37 changes: 37 additions & 0 deletions langchain/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from contextlib import asynccontextmanager

import uvicorn
from activities import TranslateParams
from fastapi import FastAPI, HTTPException
from temporalio.client import Client
from workflow import LangChainWorkflow


@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.temporal_client = await Client.connect("localhost:7233")
yield


app = FastAPI(lifespan=lifespan)


@app.post("/translate")
async def translate(phrase: str, language: str):
client = app.state.temporal_client
try:
result = await client.execute_workflow(
LangChainWorkflow.run,
TranslateParams(phrase, language),
id=f"langchain-translation-{language.lower()}-{phrase.replace(' ', '-')}",
rachfop marked this conversation as resolved.
Show resolved Hide resolved
task_queue="langchain-task-queue",
)
translation_content = result.get("content", "Translation not available")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

return {"translation": translation_content}


if __name__ == "__main__":
uvicorn.run(app, host="localhost", port=8000)
37 changes: 37 additions & 0 deletions langchain/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import asyncio

from activities import translate_phrase
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import LangChainWorkflow

interrupt_event = asyncio.Event()


async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="langchain-task-queue",
workflows=[LangChainWorkflow],
activities=[translate_phrase],
)

print("\nWorker started, ctrl+c to exit\n")
await worker.run()
try:
# Wait indefinitely until the interrupt event is set
await interrupt_event.wait()
finally:
# The worker will be shutdown gracefully due to the async context manager
print("\nShutting down the worker\n")


if __name__ == "__main__":
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print("\nInterrupt received, shutting down...\n")
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
17 changes: 17 additions & 0 deletions langchain/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from activities import TranslateParams, translate_phrase


@workflow.defn
class LangChainWorkflow:
@workflow.run
async def run(self, params: TranslateParams) -> dict:
return await workflow.execute_activity(
translate_phrase,
params,
schedule_to_close_timeout=timedelta(seconds=30),
)
Loading
Loading