Skip to content

Commit

Permalink
feat: convert pdf to image concurrently (#818)
Browse files Browse the repository at this point in the history
Because

- it is slow when pdf to image

This commit

- make converter concurrently
  • Loading branch information
chuang8511 committed Nov 14, 2024
1 parent 495c43e commit f6456e3
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 25 deletions.
79 changes: 79 additions & 0 deletions pkg/component/operator/document/v0/convert_to_images_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package document

import (
"context"
"encoding/base64"
"fmt"
"os"
"testing"

qt "github.com/frankban/quicktest"

"github.com/instill-ai/pipeline-backend/pkg/component/base"
"github.com/instill-ai/pipeline-backend/pkg/component/internal/mock"
"github.com/instill-ai/pipeline-backend/pkg/data"
"github.com/instill-ai/pipeline-backend/pkg/data/format"
)

func Test_ConvertDocumentToImages(t *testing.T) {
c := qt.New(t)

test := struct {
name string
filepath string
expectedLen int
}{
name: "Convert PDF to Images",
filepath: "testdata/test.pdf",
expectedLen: 1,
}

bc := base.Component{}
ctx := context.Background()

component := Init(bc)
c.Assert(component, qt.IsNotNil)

execution, err := component.CreateExecution(base.ComponentExecution{
Component: component,
Task: taskConvertToImages,
})
c.Assert(err, qt.IsNil)
c.Assert(execution, qt.IsNotNil)

fileContent, err := os.ReadFile(test.filepath)
c.Assert(err, qt.IsNil)

base64DataURI := fmt.Sprintf("data:%s;base64,%s", mimeTypeByExtension(test.filepath), base64.StdEncoding.EncodeToString(fileContent))

ir, ow, eh, job := mock.GenerateMockJob(c)

ir.ReadDataMock.Times(1).Set(func(ctx context.Context, input any) error {
switch input := input.(type) {
case *ConvertDocumentToImagesInput:
*input = ConvertDocumentToImagesInput{
Document: func() format.Document {
doc, err := data.NewDocumentFromURL(base64DataURI)
if err != nil {
return nil
}
return doc
}(),
}
}
return nil
})

ow.WriteDataMock.Times(1).Set(func(ctx context.Context, output any) error {
switch output := output.(type) {
case *ConvertDocumentToImagesOutput:
mock.Equal(len(output.Images), test.expectedLen)
}
return nil
})

eh.ErrorMock.Optional()

err = execution.Execute(ctx, []*base.Job{job})
c.Assert(err, qt.IsNil)
}
3 changes: 3 additions & 0 deletions pkg/component/operator/document/v0/transformer/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ var (

//go:embed execution/pdf_checker.py
pdfChecker string

//go:embed execution/get_page_numbers.py
getPageNumbersExecution string
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from io import BytesIO
import json
import sys
import base64
import pdfplumber

if __name__ == "__main__":
try:
json_str = sys.stdin.buffer.read().decode('utf-8')
params = json.loads(json_str)
pdf_string = params["PDF"]
decoded_bytes = base64.b64decode(pdf_string)
pdf_file_obj = BytesIO(decoded_bytes)

pdf = pdfplumber.open(pdf_file_obj)

output = {
"page_numbers": len(pdf.pages)
}

print(json.dumps(output))
except Exception as e:
print(json.dumps({"error": str(e)}))
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,39 @@
import json
import base64
import sys
import pdfplumber

# TODO chuang8511:
# Deal with the import error when running the code in the docker container.
# Now, we combine all python code into one file to avoid the import error.
# from pdf_to_markdown import PDFTransformer
# from pdf_to_markdown import PageImageProcessor

if __name__ == "__main__":
json_str = sys.stdin.buffer.read().decode('utf-8')
params = json.loads(json_str)
filename = params["filename"]
pdf_string = params["PDF"]
page_idx = params["page_idx"]
if "resolution" in params and params["resolution"] != 0 and params["resolution"] != None:
resolution = params["resolution"]
else:
resolution = 500

decoded_bytes = base64.b64decode(pdf_string)
pdf_file_obj = BytesIO(decoded_bytes)
pdf = PDFTransformer(x=pdf_file_obj)
pages = pdf.raw_pages
exclude_file_extension = filename.split(".")[0]
filenames = []
images = []
pdf = pdfplumber.open(pdf_file_obj)

page = pdf.pages[page_idx]

for i, page in enumerate(pages):
page_image = page.to_image(resolution=resolution)
encoded_image = PageImageProcessor.encode_image(page_image)
images.append(encoded_image)
filenames.append(f"{exclude_file_extension}_{i}.png")
page_image = page.to_image(resolution=resolution)
encoded_image = PageImageProcessor.encode_image(page_image)

exclude_file_extension = filename.split(".")[0]
filename = f"{exclude_file_extension}_{page_idx}.png"

output = {
"images": images,
"filename": filenames,
"image": encoded_image,
"filename": filename,
}

print(json.dumps(output))
83 changes: 71 additions & 12 deletions pkg/component/operator/document/v0/transformer/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/instill-ai/pipeline-backend/pkg/component/internal/util"
)
Expand All @@ -19,6 +20,16 @@ type ConvertDocumentToImagesTransformerOutput struct {
Filenames []string `json:"filenames"`
}

type pageNumbers struct {
PageNumbers int `json:"page_numbers"`
Error string `json:"error"`
}

type pageImage struct {
Image string `json:"image"`
Filename string `json:"filename"`
}

func ConvertDocumentToImage(inputStruct *ConvertDocumentToImagesTransformerInput) (*ConvertDocumentToImagesTransformerOutput, error) {

contentType, err := util.GetContentTypeFromBase64(inputStruct.Document)
Expand Down Expand Up @@ -53,27 +64,75 @@ func ConvertDocumentToImage(inputStruct *ConvertDocumentToImagesTransformerInput
base64PDFWithoutMime = util.TrimBase64Mime(base64PDF)
}

paramsJSON := map[string]interface{}{
"PDF": base64PDFWithoutMime,
"filename": inputStruct.Filename,
"resolution": inputStruct.Resolution,
getNumberJSON := map[string]interface{}{
"PDF": base64PDFWithoutMime,
}

pythonCode := imageProcessor + pdfTransformer + taskConvertToImagesExecution
pageNumbersBytes, err := util.ExecutePythonCode(getPageNumbersExecution, getNumberJSON)
if err != nil {
return nil, fmt.Errorf("get page numbers: %w", err)
}

outputBytes, err := util.ExecutePythonCode(pythonCode, paramsJSON)
var pageNumbers pageNumbers
err = json.Unmarshal(pageNumbersBytes, &pageNumbers)
if err != nil || pageNumbers.Error != "" {
return nil, fmt.Errorf("unmarshal page numbers: %w, %s", err, pageNumbers.Error)
}

if err != nil {
return nil, fmt.Errorf("failed to run python script: %w", err)
if pageNumbers.PageNumbers == 0 {
return &ConvertDocumentToImagesTransformerOutput{
Images: []string{},
Filenames: []string{},
}, nil
}

output := ConvertDocumentToImagesTransformerOutput{}
pythonCode := imageProcessor + pdfTransformer + taskConvertToImagesExecution

err = json.Unmarshal(outputBytes, &output)
// We will make this number tunable & configurable in the future.
maxWorkers := 5

if err != nil {
return nil, fmt.Errorf("failed to unmarshal output: %w", err)
jobs := make(chan int, pageNumbers.PageNumbers)
output := ConvertDocumentToImagesTransformerOutput{
Images: make([]string, pageNumbers.PageNumbers),
Filenames: make([]string, pageNumbers.PageNumbers),
}

// Create workers
wg := sync.WaitGroup{}
for w := 0; w < maxWorkers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for pageIdx := range jobs {
paramsJSON := map[string]interface{}{
"PDF": base64PDFWithoutMime,
"filename": inputStruct.Filename,
"resolution": inputStruct.Resolution,
"page_idx": pageIdx,
}
outputBytes, err := util.ExecutePythonCode(pythonCode, paramsJSON)
if err != nil {
continue
}
var pageImage pageImage
err = json.Unmarshal(outputBytes, &pageImage)
if err != nil {
continue
}
output.Images[pageIdx] = pageImage.Image
output.Filenames[pageIdx] = pageImage.Filename
}
}()
}

// Send jobs to workers
for i := 0; i < pageNumbers.PageNumbers; i++ {
jobs <- i
}
close(jobs)

// Wait for all workers to complete
wg.Wait()

if len(output.Filenames) == 0 {
output.Filenames = []string{}
Expand Down

0 comments on commit f6456e3

Please sign in to comment.