Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: convert pdf to image concurrently #818

Merged
merged 2 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions pkg/component/operator/document/v0/convert_to_images_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package document

import (
"context"
"encoding/base64"
"fmt"
"os"
"testing"

qt "github.com/frankban/quicktest"

"github.com/instill-ai/pipeline-backend/pkg/component/base"
"github.com/instill-ai/pipeline-backend/pkg/component/internal/mock"
"github.com/instill-ai/pipeline-backend/pkg/data"
"github.com/instill-ai/pipeline-backend/pkg/data/format"
)

func Test_ConvertDocumentToImages(t *testing.T) {
c := qt.New(t)

test := struct {
name string
filepath string
expectedLen int
}{
name: "Convert PDF to Images",
filepath: "testdata/test.pdf",
expectedLen: 1,
}

bc := base.Component{}
ctx := context.Background()

component := Init(bc)
c.Assert(component, qt.IsNotNil)

execution, err := component.CreateExecution(base.ComponentExecution{
Component: component,
Task: taskConvertToImages,
})
c.Assert(err, qt.IsNil)
c.Assert(execution, qt.IsNotNil)

fileContent, err := os.ReadFile(test.filepath)
c.Assert(err, qt.IsNil)

base64DataURI := fmt.Sprintf("data:%s;base64,%s", mimeTypeByExtension(test.filepath), base64.StdEncoding.EncodeToString(fileContent))

ir, ow, eh, job := mock.GenerateMockJob(c)

ir.ReadDataMock.Times(1).Set(func(ctx context.Context, input any) error {
switch input := input.(type) {
case *ConvertDocumentToImagesInput:
*input = ConvertDocumentToImagesInput{
Document: func() format.Document {
doc, err := data.NewDocumentFromURL(base64DataURI)
if err != nil {
return nil
}
return doc
}(),
}
}
return nil
})

ow.WriteDataMock.Times(1).Set(func(ctx context.Context, output any) error {
switch output := output.(type) {
case *ConvertDocumentToImagesOutput:
mock.Equal(len(output.Images), test.expectedLen)
}
return nil
})

eh.ErrorMock.Optional()

err = execution.Execute(ctx, []*base.Job{job})
c.Assert(err, qt.IsNil)
}
3 changes: 3 additions & 0 deletions pkg/component/operator/document/v0/transformer/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ var (

//go:embed execution/pdf_checker.py
pdfChecker string

//go:embed execution/get_page_numbers.py
getPageNumbersExecution string
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from io import BytesIO
import json
import sys
import base64
import pdfplumber

if __name__ == "__main__":
try:
json_str = sys.stdin.buffer.read().decode('utf-8')
params = json.loads(json_str)
pdf_string = params["PDF"]
decoded_bytes = base64.b64decode(pdf_string)
pdf_file_obj = BytesIO(decoded_bytes)

pdf = pdfplumber.open(pdf_file_obj)

output = {
"page_numbers": len(pdf.pages)
}

print(json.dumps(output))
except Exception as e:
print(json.dumps({"error": str(e)}))
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,39 @@
import json
import base64
import sys
import pdfplumber

# TODO chuang8511:
# Deal with the import error when running the code in the docker container.
# Now, we combine all python code into one file to avoid the import error.
# from pdf_to_markdown import PDFTransformer
# from pdf_to_markdown import PageImageProcessor

if __name__ == "__main__":
json_str = sys.stdin.buffer.read().decode('utf-8')
params = json.loads(json_str)
filename = params["filename"]
pdf_string = params["PDF"]
page_idx = params["page_idx"]
if "resolution" in params and params["resolution"] != 0 and params["resolution"] != None:
resolution = params["resolution"]
else:
resolution = 500

decoded_bytes = base64.b64decode(pdf_string)
pdf_file_obj = BytesIO(decoded_bytes)
pdf = PDFTransformer(x=pdf_file_obj)
pages = pdf.raw_pages
exclude_file_extension = filename.split(".")[0]
filenames = []
images = []
pdf = pdfplumber.open(pdf_file_obj)

page = pdf.pages[page_idx]

for i, page in enumerate(pages):
page_image = page.to_image(resolution=resolution)
encoded_image = PageImageProcessor.encode_image(page_image)
images.append(encoded_image)
filenames.append(f"{exclude_file_extension}_{i}.png")
page_image = page.to_image(resolution=resolution)
encoded_image = PageImageProcessor.encode_image(page_image)

exclude_file_extension = filename.split(".")[0]
filename = f"{exclude_file_extension}_{page_idx}.png"

output = {
"images": images,
"filename": filenames,
"image": encoded_image,
"filename": filename,
}

print(json.dumps(output))
83 changes: 71 additions & 12 deletions pkg/component/operator/document/v0/transformer/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/instill-ai/pipeline-backend/pkg/component/internal/util"
)
Expand All @@ -19,6 +20,16 @@ type ConvertDocumentToImagesTransformerOutput struct {
Filenames []string `json:"filenames"`
}

type pageNumbers struct {
PageNumbers int `json:"page_numbers"`
Error string `json:"error"`
}

type pageImage struct {
Image string `json:"image"`
Filename string `json:"filename"`
}

func ConvertDocumentToImage(inputStruct *ConvertDocumentToImagesTransformerInput) (*ConvertDocumentToImagesTransformerOutput, error) {

contentType, err := util.GetContentTypeFromBase64(inputStruct.Document)
Expand Down Expand Up @@ -53,27 +64,75 @@ func ConvertDocumentToImage(inputStruct *ConvertDocumentToImagesTransformerInput
base64PDFWithoutMime = util.TrimBase64Mime(base64PDF)
}

paramsJSON := map[string]interface{}{
"PDF": base64PDFWithoutMime,
"filename": inputStruct.Filename,
"resolution": inputStruct.Resolution,
getNumberJSON := map[string]interface{}{
"PDF": base64PDFWithoutMime,
}

pythonCode := imageProcessor + pdfTransformer + taskConvertToImagesExecution
pageNumbersBytes, err := util.ExecutePythonCode(getPageNumbersExecution, getNumberJSON)
if err != nil {
return nil, fmt.Errorf("get page numbers: %w", err)
}

outputBytes, err := util.ExecutePythonCode(pythonCode, paramsJSON)
var pageNumbers pageNumbers
err = json.Unmarshal(pageNumbersBytes, &pageNumbers)
if err != nil || pageNumbers.Error != "" {
return nil, fmt.Errorf("unmarshal page numbers: %w, %s", err, pageNumbers.Error)
}

if err != nil {
return nil, fmt.Errorf("failed to run python script: %w", err)
if pageNumbers.PageNumbers == 0 {
return &ConvertDocumentToImagesTransformerOutput{
Images: []string{},
Filenames: []string{},
}, nil
}

output := ConvertDocumentToImagesTransformerOutput{}
pythonCode := imageProcessor + pdfTransformer + taskConvertToImagesExecution

err = json.Unmarshal(outputBytes, &output)
// We will make this number tunable & configurable in the future.
maxWorkers := 5

if err != nil {
return nil, fmt.Errorf("failed to unmarshal output: %w", err)
jobs := make(chan int, pageNumbers.PageNumbers)
output := ConvertDocumentToImagesTransformerOutput{
Images: make([]string, pageNumbers.PageNumbers),
Filenames: make([]string, pageNumbers.PageNumbers),
}

// Create workers
wg := sync.WaitGroup{}
for w := 0; w < maxWorkers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for pageIdx := range jobs {
paramsJSON := map[string]interface{}{
"PDF": base64PDFWithoutMime,
"filename": inputStruct.Filename,
"resolution": inputStruct.Resolution,
"page_idx": pageIdx,
}
outputBytes, err := util.ExecutePythonCode(pythonCode, paramsJSON)
if err != nil {
continue
}
var pageImage pageImage
err = json.Unmarshal(outputBytes, &pageImage)
if err != nil {
continue
}
output.Images[pageIdx] = pageImage.Image
output.Filenames[pageIdx] = pageImage.Filename
}
}()
}

// Send jobs to workers
for i := 0; i < pageNumbers.PageNumbers; i++ {
jobs <- i
}
close(jobs)

// Wait for all workers to complete
wg.Wait()

if len(output.Filenames) == 0 {
output.Filenames = []string{}
Expand Down
Loading