Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gcs poc multi #52

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open

Gcs poc multi #52

wants to merge 3 commits into from

Conversation

psainics
Copy link
Collaborator

@psainics psainics commented Apr 12, 2024

WIP

We create a _temp dir on the root path and store all the paths,
At the final driver commit we take the set on all the paths and do one commit on each path !

@psainics psainics requested a review from vikasrathee-cs April 12, 2024 04:18
@psainics psainics self-assigned this Apr 12, 2024
Copy link

@tivv tivv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The biggest challenge with this approach is to test all possible failure cases. I am wondering if we should look on how spark itself does it and make a copy of all test cases

Path tempPath = new Path(outputPath, FileOutputCommitter.PENDING_DIR_NAME);
FileSystem fs = tempPath.getFileSystem(context.getConfiguration());
String taskId = context.getTaskAttemptID().getTaskID().toString();
Path taskPartitionFile = new Path(tempPath, taskId + "_partitions.txt");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's a task retry we should overwrite it

if (!fs.exists(taskPartitionFile)) {
fs.createNewFile(taskPartitionFile);
}
DataOutputStream out = fs.append(taskPartitionFile);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use try with resources to ensure stream is always closed

DataInputStream in = new DataInputStream(new BufferedInputStream(dis));
BufferedReader br = new BufferedReader(new java.io.InputStreamReader(in));
String line;
while ((line = br.readLine()) != null) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should never be more than 1 line

Copy link
Collaborator

@vikasrathee-cs vikasrathee-cs Apr 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can have multiple lines if that split has records for multiple table names. E.g. We have country as tableName, and split has records for both US and India, it will write path upto country/suffix into this file.

Copy link
Collaborator Author

@psainics psainics Apr 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one task can be responsible for processing multiple partitions, hence there can be multiple lines.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got you. Well, in this case we need to ensure file is removed on the beginning of attempt (in case of any attempt retries)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants