Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sourcery refactored main branch #2

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open

Conversation

sourcery-ai[bot]
Copy link

@sourcery-ai sourcery-ai bot commented Nov 23, 2023

Branch main refactored by Sourcery.

If you're happy with these changes, merge this Pull Request using the Squash and merge strategy.

See our documentation here.

Run Sourcery locally

Reduce the feedback loop during development by using the Sourcery editor plugin:

Review changes via command line

To manually merge these changes, make sure you're on the main branch, then run:

git fetch origin sourcery/main
git merge --ff-only FETCH_HEAD
git reset HEAD^

Help us improve this pull request!

@sourcery-ai sourcery-ai bot requested a review from ludoplex November 23, 2023 15:58
Copy link
Author

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sourcery timed out performing refactorings.

Due to GitHub API limits, only the first 60 comments can be shown.

Comment on lines -40 to +45
self.extras: dict = {}
self.extras['autofe'] = list_requirements("pyrecdp/autofe/requirements.txt")
self.extras: dict = {
'autofe': list_requirements("pyrecdp/autofe/requirements.txt")
}
self.extras['LLM'] = list_requirements("pyrecdp/LLM/requirements.txt")
self.extras["all"] = list(set(chain.from_iterable(self.extras.values()))
)
self.extras["all"] = list(set(chain.from_iterable(self.extras.values()))
)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function SetupSpec.__init__ refactored with the following changes:

Comment on lines -34 to +37
file_names = dict((key, path_prefix + os.path.join(current_path, data_folder, filename)) for key, filename in SO_FILE.items())
file_names = {
key: path_prefix + os.path.join(current_path, data_folder, filename)
for key, filename in SO_FILE.items()
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function main refactored with the following changes:

Comment on lines -68 to +79
str_fields1 = [StructField('%s' % i, StringType())
for i in self.string_cols1]
long_fields1 = [StructField('%s' % i, LongType())
for i in self.long_cols1]
str_fields2 = [StructField('%s' % i, StringType())
for i in self.string_cols2]
long_fields2 = [StructField('%s' % i, LongType())
for i in self.long_cols2]
bool_fields1 = [StructField('%s' % i, BooleanType())
for i in self.bool_cols1]
long_fields3 = [StructField('%s' % i, LongType())
for i in self.long_cols3]
str_fields3 = [StructField('%s' % i, StringType())
for i in self.string_cols3]
long_fields4 = [StructField('%s' % i, LongType())
for i in self.long_cols4]
bool_fields2 = [StructField('%s' % i, BooleanType())
for i in self.bool_cols2]
long_fields5 = [StructField('%s' % i, LongType())
for i in self.long_cols5]
bool_fields3 = [StructField('%s' % i, BooleanType())
for i in self.bool_cols3]
double_fields = [StructField('%s' % i, DoubleType())
for i in self.double_cols]
str_fields1 = [StructField(f'{i}', StringType()) for i in self.string_cols1]
long_fields1 = [StructField(f'{i}', LongType()) for i in self.long_cols1]
str_fields2 = [StructField(f'{i}', StringType()) for i in self.string_cols2]
long_fields2 = [StructField(f'{i}', LongType()) for i in self.long_cols2]
bool_fields1 = [StructField(f'{i}', BooleanType()) for i in self.bool_cols1]
long_fields3 = [StructField(f'{i}', LongType()) for i in self.long_cols3]
str_fields3 = [StructField(f'{i}', StringType()) for i in self.string_cols3]
long_fields4 = [StructField(f'{i}', LongType()) for i in self.long_cols4]
bool_fields2 = [StructField(f'{i}', BooleanType()) for i in self.bool_cols2]
long_fields5 = [StructField(f'{i}', LongType()) for i in self.long_cols5]
bool_fields3 = [StructField(f'{i}', BooleanType()) for i in self.bool_cols3]
double_fields = [StructField(f'{i}', DoubleType()) for i in self.double_cols]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function RecsysSchema.toStructType refactored with the following changes:

Comment on lines -417 to +421

if len(x)>rw:
return hashit(x[rw])
elif rw<0:
if len(x)>0:
return hashit(x[-1])
else:
return 0
return hashit(x[-1]) if len(x)>0 else 0
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function ret_word refactored with the following changes:

Comment on lines -437 to -445
if len(text_split)>1:
if text_split[1] in ['_']:
uhash += clean_text(text_split[1]) + clean_text(text_split[2])
text_split = text_split[2:]
else:
cl_loop = False
if len(text_split) > 1 and text_split[1] in ['_']:
uhash += clean_text(text_split[1]) + clean_text(text_split[2])
text_split = text_split[2:]
else:
cl_loop = False

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function extract_hash refactored with the following changes:

Comment on lines -1137 to +1157
df = spark.read.parquet(path_prefix+"/recsys2021/datapre_stage1/stage1_valid_all")
df = spark.read.parquet(
f"{path_prefix}/recsys2021/datapre_stage1/stage1_valid_all"
)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function valid_stage2 refactored with the following changes:

Comment on lines -1193 to +1238

############# load decoder data
df = spark.read.parquet(path_prefix+current_path+"test1_decode")
print("data decoded!")

############# load dict from stage 1
dict_names = ['tweet', 'mention']
dict_dfs = [{'col_name': name, 'dict': spark.read.parquet(
"%s/%s/%s/%s" % (proc.path_prefix, proc.current_path, proc.dicts_path, name))} for name in dict_names]
dict_dfs = [
{
'col_name': name,
'dict': spark.read.parquet(
f"{proc.path_prefix}/{proc.current_path}/{proc.dicts_path}/{name}"
),
}
for name in dict_names
]
_, te_test_dfs, y_mean_all_df = getTargetEncodingFeaturesDicts(proc, mode='stage1', train_dict_load=False)

############# set up to stage 2
current_path = "/recsys2021/datapre_stage2/"
proc = DataProcessor(spark, path_prefix,
current_path=current_path, dicts_path=dicts_folder, shuffle_disk_capacity="1500GB",spark_mode='local')

############# count encoding
ce_test_dfs = CountEncodingFeatures(df, proc, gen_dict=True,mode="inference",train_generate=False)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function inference_join refactored with the following changes:

ctr = positive/float(len(gt))
return ctr
return positive/float(len(gt))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function calculate_ctr refactored with the following changes:

Comment on lines -56 to +60
feature_list = []
feature_list.append(stage1_reply_features)
feature_list.append(stage1_retweet_features)
feature_list.append(stage1_comment_features)
feature_list.append(stage1_like_features)
feature_list = [
stage1_reply_features,
stage1_retweet_features,
stage1_comment_features,
stage1_like_features,
]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lines 56-174 refactored with the following changes:

ctr = positive/float(len(gt))
return ctr
return positive/float(len(gt))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function calculate_ctr refactored with the following changes:

Comment on lines -51 to +55
feature_list = []
feature_list.append(stage2_reply_features)
feature_list.append(stage2_retweet_features)
feature_list.append(stage2_comment_features)
feature_list.append(stage2_like_features)
feature_list = [
stage2_reply_features,
stage2_retweet_features,
stage2_comment_features,
stage2_like_features,
]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lines 51-165 refactored with the following changes:

Comment on lines -16 to +30
test = pd.read_parquet(f'{data_path}/stage12_test')
test = pd.read_parquet(f'{data_path}/stage12_test')
print(test.shape)
print(f"load data took {time.time() - t1} s")

######## split data
t1 = time.time()
indexs = [i for i in range(distributed_nodes)]
step = int(len(test)/distributed_nodes)
indexs = list(range(distributed_nodes))
step = len(test) // distributed_nodes
tests = []
for i in range(distributed_nodes):
if i<distributed_nodes-1:
tests.append(test[i*step:(i+1)*step])
else:
tests.append(test[i*step:])

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lines 16-30 refactored with the following changes:

Comment on lines -412 to +416

if len(x)>rw:
return hashit(x[rw])
elif rw<0:
if len(x)>0:
return hashit(x[-1])
else:
return 0
return hashit(x[-1]) if len(x)>0 else 0
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function ret_word refactored with the following changes:

Comment on lines -432 to -440
if len(text_split)>1:
if text_split[1] in ['_']:
uhash += clean_text(text_split[1]) + clean_text(text_split[2])
text_split = text_split[2:]
else:
cl_loop = False
if len(text_split) > 1 and text_split[1] in ['_']:
uhash += clean_text(text_split[1]) + clean_text(text_split[2])
text_split = text_split[2:]
else:
cl_loop = False

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function extract_hash refactored with the following changes:

Comment on lines -463 to +460
elif x[-1]=='?' and x[-2]=='!':
elif x[-1] == '?':
return(2)
elif x[-1]=='!' and x[-2]=='?':
return(3)
elif x[-1]=='!' and x[-2]!='?':
elif x[-1] == '!':
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function check_last_char_quest refactored with the following changes:

Comment on lines -1160 to +1171

############# load decoder data
df = spark.read.parquet(path_prefix+current_path+'test1_decode.parquet')
print("data decoded!")

############# load dict from stage 1
dict_names = ['tweet', 'mention']
dict_dfs = [{'col_name': name, 'dict': pd.read_parquet(
"%s/%s/%s/%s" % (path_prefix, current_path, dicts_folder, name+'.parquet'))} for name in dict_names]
dict_dfs = [
{
'col_name': name,
'dict': pd.read_parquet(
f"{path_prefix}/{current_path}/{dicts_folder}/{name + '.parquet'}"
),
}
for name in dict_names
]
_, te_test_dfs, y_mean_all_df = getTargetEncodingFeaturesDicts(mode='stage1', train_dict_load=False)

############# set up to stage 2
current_path = "/recsys2021/datapre_stage2/"

############# count encoding
ce_test_dfs = CountEncodingFeatures(df, gen_dict=True,mode="inference",train_generate=False)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function inference_join refactored with the following changes:

Comment on lines -72 to +78
dict_dfs = [{'col_name': name, 'dict': df.select(spk_func.col(name).alias('dict_col'))} for name in dict_names]
return dict_dfs
return [
{
'col_name': name,
'dict': df.select(spk_func.col(name).alias('dict_col')),
}
for name in dict_names
]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function get_dict_for_asin refactored with the following changes:

Comment on lines -120 to +133
dict_dfs = []
dict_dfs.append({'col_name': 'reviewer_id', 'dict': user_df})
dict_dfs.append({'col_name': 'asin', 'dict': asin_df})
dict_dfs.append({'col_name': 'category', 'dict': cat_df})
dict_dfs.append({'col_name': 'hist_asin', 'dict': asin_df})
dict_dfs.append({'col_name': 'hist_category', 'dict': cat_df})
dict_dfs.append({'col_name': 'noclk_hist_asin', 'dict': asin_df})
dict_dfs.append({'col_name': 'noclk_hist_category', 'dict': asin_cat_df})

dict_dfs = [
{'col_name': 'reviewer_id', 'dict': user_df},
{'col_name': 'asin', 'dict': asin_df},
{'col_name': 'category', 'dict': cat_df},
{'col_name': 'hist_asin', 'dict': asin_df},
{'col_name': 'hist_category', 'dict': cat_df},
{'col_name': 'noclk_hist_asin', 'dict': asin_df},
{'col_name': 'noclk_hist_category', 'dict': asin_cat_df},
]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function categorify_dien_data refactored with the following changes:

Comment on lines -144 to +151
with open("/home/xxx/dien/" + f'/{output_name}.pkl', "rb") as f:
voc = dict((key, value) for (key,value) in pkl.load(f).items()) #nosec
dict_df = convert_to_spark_df(voc, proc.spark)
return dict_df
with open(f'/home/xxx/dien//{output_name}.pkl', "rb") as f:
voc = dict(pkl.load(f).items())
return convert_to_spark_df(voc, proc.spark)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function load_voc refactored with the following changes:

Comment on lines -212 to +216
for user, r in user_map.items():
for r in user_map.values():
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function save_to_local_splitByUser refactored with the following changes:

Comment on lines -226 to -235
idx = 0
if len(source_path_dict[output_name + fix]) == 1:
file_name = source_path_dict[output_name + fix][0]
shutil.copy(file_name, f"{tgt_path}")
else:
for file_name in source_path_dict[output_name + fix]:
for idx, file_name in enumerate(source_path_dict[output_name + fix]):
#print(f"result renamed from {file_name} to {tgt_path}_{idx}")
shutil.copy(file_name, f"{tgt_path}_{idx}")
shutil.rmtree(file_name, ignore_errors=True)
idx += 1
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function result_rename_or_convert refactored with the following changes:

Comment on lines -275 to +293
reviews_info_df = process_reviews(spark, "%s/%s/raw_data/reviews_Books.json" % (path_prefix, original_folder), proc, "reviews-info")
reviews_info_df = process_reviews(
spark,
f"{path_prefix}/{original_folder}/raw_data/reviews_Books.json",
proc,
"reviews-info",
)
#reviews_info_df.repartition(1).write.format("csv").option('sep', '\t').mode("overwrite").save("%s/%s/j2c_test/reviews-info-spark" % (path_prefix, original_folder))
t1 = timer()
print(f"parse reviews-info with spark took {(t1 - t0)} secs")

t0 = timer()
item_info_df = process_meta(spark, '%s/%s/raw_data/meta_Books.json' % (path_prefix, original_folder), proc, "item-info")
item_info_df = process_meta(
spark,
f'{path_prefix}/{original_folder}/raw_data/meta_Books.json',
proc,
"item-info",
)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function main refactored with the following changes:

Comment on lines -27 to +36
reviews_info_df = spark.read.schema(reviews_info_schema).option('sep', '\t').csv(path + "/reviews-info")
item_info_df = spark.read.schema(item_info_schema).option('sep', '\t').csv(path + "/item-info")
reviews_info_df = (
spark.read.schema(reviews_info_schema)
.option('sep', '\t')
.csv(f"{path}/reviews-info")
)
item_info_df = (
spark.read.schema(item_info_schema)
.option('sep', '\t')
.csv(f"{path}/item-info")
)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function load_csv refactored with the following changes:

Comment on lines -94 to +108
dict_dfs = [{'col_name': name, 'dict': df.select(spk_func.col(name).alias('dict_col'))} for name in dict_names]
return dict_dfs
return [
{
'col_name': name,
'dict': df.select(spk_func.col(name).alias('dict_col')),
}
for name in dict_names
]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function get_dict_for_asin refactored with the following changes:

Comment on lines -153 to +170
dict_dfs = []
dict_dfs.append({'col_name': 'reviewer_id', 'dict': user_df})
dict_dfs.append({'col_name': 'asin', 'dict': asin_df})
dict_dfs.append({'col_name': 'category', 'dict': cat_df})

dict_dfs = [
{'col_name': 'reviewer_id', 'dict': user_df},
{'col_name': 'asin', 'dict': asin_df},
{'col_name': 'category', 'dict': cat_df},
]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function categorify_dien_data refactored with the following changes:

Comment on lines -331 to +368
reviews_info_df = process_reviews(spark, "%s/%s/raw_data/reviews_Books.json" % (path_prefix, original_folder), proc, "reviews-info")
reviews_info_df = process_reviews(
spark,
f"{path_prefix}/{original_folder}/raw_data/reviews_Books.json",
proc,
"reviews-info",
)
#reviews_info_df.repartition(1).write.format("csv").option('sep', '\t').mode("overwrite").save("%s/%s/j2c_test/reviews-info-spark" % (path_prefix, original_folder))
t1 = timer()
print(f"parse reviews-info with spark took {(t1 - t0)} secs")

t0 = timer()
item_info_df = process_meta(spark, '%s/%s/raw_data/meta_Books.json' % (path_prefix, original_folder), proc, "item-info")
item_info_df = process_meta(
spark,
f'{path_prefix}/{original_folder}/raw_data/meta_Books.json',
proc,
"item-info",
)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function main refactored with the following changes:

Comment on lines -41 to +49
dict_dfs = [{'col_name': name, 'dict': proc.spark.read.parquet(
"%s/%s/%s/%s" % (proc.path_prefix, proc.current_path, proc.dicts_path, name))} for name in to_categorify_cols]
dict_dfs = [
{
'col_name': name,
'dict': proc.spark.read.parquet(
f"{proc.path_prefix}/{proc.current_path}/{proc.dicts_path}/{name}"
),
}
for name in to_categorify_cols
]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function categorifyAllFeatures refactored with the following changes:

Comment on lines -41 to +49
dict_dfs = [{'col_name': name, 'dict': proc.spark.read.format("arrow").load(
"%s/%s/%s/%s" % (proc.path_prefix, proc.current_path, proc.dicts_path, name))} for name in to_categorify_cols]
dict_dfs = [
{
'col_name': name,
'dict': proc.spark.read.format("arrow").load(
f"{proc.path_prefix}/{proc.current_path}/{proc.dicts_path}/{name}"
),
}
for name in to_categorify_cols
]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function categorifyAllFeatures refactored with the following changes:

Comment on lines -123 to +127
print(f"start launcu_milvus")
print("start launcu_milvus")
launch_milvus()
document_store = MilvusDocumentStore(host="localhost", username="", password="",
index="flat")
print(f"start create retriever")
print("start create retriever")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function main refactored with the following changes:

Comment on lines -43 to +46
file_names = dict((key, path_prefix + os.path.join(current_path, data_folder, filename)) for key, filename in SO_FILE.items())
file_names = {
key: path_prefix + os.path.join(current_path, data_folder, filename)
for key, filename in SO_FILE.items()
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function main refactored with the following changes:

Copy link

sweep-ai bot commented Nov 23, 2023

Apply Sweep Rules to your PR?

  • Apply: All new business logic should have corresponding unit tests.
  • Apply: Refactor large functions to be more modular.
  • Apply: Add docstrings to all functions and file headers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

0 participants