From b7fe3dcb56692d1faea2a22b43ee41fccba84cb6 Mon Sep 17 00:00:00 2001 From: lizgzil Date: Tue, 26 Oct 2021 17:35:32 +0100 Subject: [PATCH 01/11] Add script to create a sample of job adverts, also a notebook to view sampled data dates as compared to all data, and finally some tweaks to the predict sentence class script to process this new form of data --- .../TextKernel Sample - 25th Oct 2021.py | 139 +++++++++++++ .../predict_sentence_class.py | 185 +++++++++++++++--- .../tk_data_analysis/get_tk_sample.py | 99 ++++++++++ 3 files changed, 396 insertions(+), 27 deletions(-) create mode 100644 skills_taxonomy_v2/analysis/tk_analysis/TextKernel Sample - 25th Oct 2021.py create mode 100644 skills_taxonomy_v2/pipeline/tk_data_analysis/get_tk_sample.py diff --git a/skills_taxonomy_v2/analysis/tk_analysis/TextKernel Sample - 25th Oct 2021.py b/skills_taxonomy_v2/analysis/tk_analysis/TextKernel Sample - 25th Oct 2021.py new file mode 100644 index 0000000..5b73629 --- /dev/null +++ b/skills_taxonomy_v2/analysis/tk_analysis/TextKernel Sample - 25th Oct 2021.py @@ -0,0 +1,139 @@ +# --- +# jupyter: +# jupytext: +# cell_metadata_filter: -all +# comment_magics: true +# text_representation: +# extension: .py +# format_name: percent +# format_version: '1.3' +# jupytext_version: 1.11.4 +# kernelspec: +# display_name: Python 3 (ipykernel) +# language: python +# name: python3 +# --- + +# %% [markdown] +# ## Examining the sample of textkernel data used from the extension work from 25th October 2021. +# +# - Not all the job ids have dates given in the date metadata (it may not even be a key, or may have a value of None). +# i.e. there are 62892486 job adverts, but only 50566709 keys in the dates metadata +# +# + +# %% +from skills_taxonomy_v2.getters.s3_data import ( + load_s3_data, + get_s3_data_paths, + save_to_s3, +) + +from collections import Counter, defaultdict +import random +import os + +from datetime import datetime +from tqdm import tqdm +import pandas as pd +import boto3 +import matplotlib.pyplot as plt + +bucket_name = "skills-taxonomy-v2" +s3 = boto3.resource("s3") + +# %% [markdown] +# ### All the TK data with dates + +# %% +tk_dates = {} +for file_name in tqdm(range(0, 13)): + file_date_dict = load_s3_data( + s3, bucket_name, f"outputs/tk_data_analysis/metadata_date/{file_name}.json" + ) + tk_dates.update({k: f[0] for k, f in file_date_dict.items()}) + +print(len(tk_dates)) + +# %% +job_ads_date_count = defaultdict(int) + +for k, v in tqdm(tk_dates.items()): + if v: + date = v[0:7] + job_ads_date_count[date] += 1 + else: + job_ads_date_count["No date given"] += 1 + +# %% +sum(job_ads_date_count.values()) == total_n_job_adverts + +# %% [markdown] +# ### Dates for the sample + +# %% +sample_dict = load_s3_data( + s3, bucket_name, "outputs/tk_sample_data/sample_file_locations.json" +) + +# %% +sample_dict["historical/2020/2020-03-11/jobs_0.0.jsonl.gz"][0:10] + +# %% +sum([len(v) for v in sample_dict.values()]) + +# %% +job_ads_date_count_sample = defaultdict(int) +for job_id_list in tqdm(sample_dict.values()): + for job_id in job_id_list: + v = tk_dates.get(job_id) + if v: + date = v[0:7] + job_ads_date_count_sample[date] += 1 + else: + job_ads_date_count_sample["No date given"] += 1 + +# %% +sum(job_ads_date_count_sample.values()) + + +# %% [markdown] +# ### Plot proportions together + +# %% +def find_num_dates(count_dict): + num_dates = { + int(k.split("-")[0]) + int(k.split("-")[1]) / 12: v + for k, v in count_dict.items() + if k != "No date given" + } + num_dates[2014] = count_dict["No date given"] + return num_dates + + +# %% +num_dates = find_num_dates(job_ads_date_count) +num_dates_sample = find_num_dates(job_ads_date_count_sample) + +# %% +plt.figure(figsize=(10, 4)) +plt.bar( + num_dates.keys(), + [v / sum(num_dates.values()) for v in num_dates.values()], + width=0.1, + alpha=0.5, + label="All data", +) +plt.bar( + num_dates_sample.keys(), + [v / sum(num_dates_sample.values()) for v in num_dates_sample.values()], + width=0.1, + color="red", + alpha=0.5, + label="Sample of data", +) +plt.legend() +plt.xlabel("Date of job advert (2014 = no date given)") +plt.ylabel("Proportion") + +# %% diff --git a/skills_taxonomy_v2/pipeline/sentence_classifier/predict_sentence_class.py b/skills_taxonomy_v2/pipeline/sentence_classifier/predict_sentence_class.py index 2acb143..aa01f42 100644 --- a/skills_taxonomy_v2/pipeline/sentence_classifier/predict_sentence_class.py +++ b/skills_taxonomy_v2/pipeline/sentence_classifier/predict_sentence_class.py @@ -41,6 +41,7 @@ from skills_taxonomy_v2 import PROJECT_DIR, BUCKET_NAME +from skills_taxonomy_v2.getters.s3_data import load_s3_data, save_to_s3 logger = logging.getLogger(__name__) @@ -68,7 +69,7 @@ def load_local_data(file_name): return data -def load_s3_data(file_name, s3): +def load_s3_text_data(file_name, s3): """ Load from S3 locations - jsonl.gz file_name: S3 key @@ -248,7 +249,7 @@ def run_predict_sentence_class( if data_local: data = load_local_data(data_path) else: - data = load_s3_data(data_path, s3) + data = load_s3_text_data(data_path, s3) data = data[0:10000] if data: @@ -329,12 +330,124 @@ def run_predict_sentence_class( save_outputs_to_s3(s3, skill_sentences_dict, output_file_dir) +def run_predict_sentence_class_presample( + input_dir, data_dir, model_config_name, output_dir, sampled_data_loc +): + """ + Get predictions from the sample of job adverts already found + in get_tk_sample.py and save out for every relevant file in the dir. + """ + + sent_classifier, _ = load_model(model_config_name) + nlp = spacy.load("en_core_web_sm") + + s3 = boto3.resource("s3") + + sample_dict = load_s3_data(s3, BUCKET_NAME, sampled_data_loc) + + # Make predictions and save output for data path(s) + logger.info(f"Running predictions on {len(sample_dict)} data files ...") + for data_subpath, job_ids in sample_dict.items(): + # Run predictions and save outputs iteratively + data_path = os.path.join(input_dir, data_dir, data_subpath) + logger.info(f"Loading data from {data_path} ...") + data = load_s3_data(s3, BUCKET_NAME, data_path) + + job_ids_set = set(job_ids) + + # Sample of job adverts used for this file + neccessary_fields = ["full_text", "job_id"] + data = [ + {field: job_ad.get(field, None) for field in neccessary_fields} + for job_ad in data + if job_ad["job_id"] in job_ids_set + ] + + # CHANGE ==========================================!!!!!!!!!!!! + data = data[0:10] + + if data: + output_file_dir = get_output_name( + data_path, input_dir, output_dir, model_config_name + ) + + logger.info(f"Splitting sentences ...") + start_time = time.time() + with Pool(4) as pool: # 4 cpus + partial_split_sentence = partial(split_sentence, nlp=nlp, min_length=30) + split_sentence_pool_output = pool.map(partial_split_sentence, data) + logger.info(f"Splitting sentences took {time.time() - start_time} seconds") + + # Process output into one list of sentences for all documents + sentences = [] + job_ids = [] + for i, (job_id, s) in enumerate(split_sentence_pool_output): + if s: + sentences += s + job_ids += [job_id] * len(s) + + logger.info(f"Processing sentences took {time.time() - start_time} seconds") + + if sentences: + logger.info(f"Transforming skill sentences ...") + sentences_vec = sent_classifier.transform(sentences) + pool_sentences_vec = [ + (vec_ix, [vec]) for vec_ix, vec in enumerate(sentences_vec) + ] + + logger.info(f"Chunking up sentences ...") + start_time = time.time() + # Manually chunk up the data to predict multiple in a pool + # This is because predict can't deal with massive vectors + pool_sentences_vecs = [] + pool_sentences_vec = [] + for vec_ix, vec in enumerate(sentences_vec): + pool_sentences_vec.append((vec_ix, vec)) + if len(pool_sentences_vec) > 1000: + pool_sentences_vecs.append(pool_sentences_vec) + pool_sentences_vec = [] + if len(pool_sentences_vec) != 0: + # Add the final chunk if not empty + pool_sentences_vecs.append(pool_sentences_vec) + logger.info( + f"Chunking up sentences into {len(pool_sentences_vecs)} chunks took {time.time() - start_time} seconds" + ) + + logger.info(f"Predicting skill sentences ...") + start_time = time.time() + with Pool(4) as pool: # 4 cpus + partial_predict_sentences = partial( + predict_sentences, sent_classifier=sent_classifier + ) + predict_sentences_pool_output = pool.map( + partial_predict_sentences, pool_sentences_vecs + ) + logger.info( + f"Predicting on {len(sentences)} sentences took {time.time() - start_time} seconds" + ) + + # Process output into one list of sentences for all documents + logger.info(f"Combining data for output ...") + start_time = time.time() + skill_sentences_dict = defaultdict(list) + for chunk_output in predict_sentences_pool_output: + for (sent_ix, pred) in chunk_output: + if pred == 1: + job_id = job_ids[sent_ix] + sentence = sentences[sent_ix] + skill_sentences_dict[job_id].append(sentence) + logger.info(f"Combining output took {time.time() - start_time} seconds") + + logger.info(f"Saving data to {output_file_dir} ...") + save_to_s3(s3, BUCKET_NAME, skill_sentences_dict, output_file_dir) + + def parse_arguments(parser): parser.add_argument( "--config_path", help="Path to config file", - default="skills_taxonomy_v2/config/predict_skill_sentences/2021.08.16.local.sample.yaml", + default="skills_taxonomy_v2/config/predict_skill_sentences/2021.10.26.yaml", ) args, unknown = parser.parse_known_args() @@ -355,29 +468,47 @@ def parse_arguments(parser): flow_config = config["flows"][FLOW_ID] params = flow_config["params"] - if not params["sample_data_paths"]: - # If you don't want to sample the data you can set these to None - params["random_seed"] = None - params["sample_size"] = None - - # Output data in a subfolder with the name of the model used to make the predictions - if params["data_local"]: - input_dir = os.path.join(PROJECT_DIR, params["input_dir"]) - output_dir = os.path.join( - PROJECT_DIR, params["output_dir"], params["model_config_name"] + if params.get("sampled_data_loc"): + # >=26th October 2021 configs should have this + # where job advert sampling has already been done + # Previously the outputs were stored in a folder with the + # model version, now store in the config version. + run_predict_sentence_class_presample( + params["input_dir"], + params["data_dir"], + params["model_config_name"], + os.path.join( + params["output_dir"], + os.path.basename(args.config_path).split(".yaml")[0], + ), + params["sampled_data_loc"], ) + else: - # If we are pulling the data from S3 we don't want the paths to join with our local project_dir - input_dir = params["input_dir"] - output_dir = os.path.join(params["output_dir"], params["model_config_name"]) - - run_predict_sentence_class( - input_dir, - params["data_dir"], - params["model_config_name"], - output_dir, - data_local=params["data_local"], - sample_data_paths=params["sample_data_paths"], - random_seed=params["random_seed"], - sample_size=params["sample_size"], - ) + # Old version + if not params.get("sample_data_paths"): + # If you don't want to sample the data you can set these to None + params["random_seed"] = None + params["sample_size"] = None + + # Output data in a subfolder with the name of the model used to make the predictions + if params["data_local"]: + input_dir = os.path.join(PROJECT_DIR, params["input_dir"]) + output_dir = os.path.join( + PROJECT_DIR, params["output_dir"], params["model_config_name"] + ) + else: + # If we are pulling the data from S3 we don't want the paths to join with our local project_dir + input_dir = params["input_dir"] + output_dir = os.path.join(params["output_dir"], params["model_config_name"]) + + run_predict_sentence_class( + input_dir, + params["data_dir"], + params["model_config_name"], + output_dir, + data_local=params["data_local"], + sample_data_paths=params["sample_data_paths"], + random_seed=params["random_seed"], + sample_size=params["sample_size"], + ) diff --git a/skills_taxonomy_v2/pipeline/tk_data_analysis/get_tk_sample.py b/skills_taxonomy_v2/pipeline/tk_data_analysis/get_tk_sample.py new file mode 100644 index 0000000..08b74fc --- /dev/null +++ b/skills_taxonomy_v2/pipeline/tk_data_analysis/get_tk_sample.py @@ -0,0 +1,99 @@ +""" +Take a sample of the TK job adverts to be used in the pipeline. + +Output is a dict of each tk file name and a list of the job ids +within it which are included in the sample. +e.g. {"historical/...0.json": ['6001f8701aeb4072a8eb0cca85535208', ...]} +""" + +from skills_taxonomy_v2.getters.s3_data import ( + load_s3_data, + get_s3_data_paths, + save_to_s3, +) + +from argparse import ArgumentParser +from collections import defaultdict +import random +import os +import yaml + +from tqdm import tqdm +import boto3 + +from skills_taxonomy_v2 import BUCKET_NAME + +s3 = boto3.resource("s3") + + +def parse_arguments(parser): + + parser.add_argument( + "--config_path", + help="Path to config file", + default="skills_taxonomy_v2/config/tk_data_sample/2021.10.25.yaml", + ) + + return parser.parse_args() + + +if __name__ == "__main__": + + parser = ArgumentParser() + args = parse_arguments(parser) + + with open(args.config_path, "r") as f: + config = yaml.load(f, Loader=yaml.FullLoader) + + FLOW_ID = "get_tk_sample" + flow_config = config["flows"][FLOW_ID] + params = flow_config["params"] + + tk_metadata_dir = params["tk_metadata_dir"] + + # Get all the job ids in the job id:file location dictionary + # from tk metadata (outputted from "get_bulk_metadata.py") + tk_metadata_paths = get_s3_data_paths( + s3, BUCKET_NAME, tk_metadata_dir, file_types=["*.json"] + ) + + # There is some duplication in job id, so use unique set + job_ids = set() + for tk_metadata_path in tqdm(tk_metadata_paths): + file_dict = load_s3_data(s3, BUCKET_NAME, tk_metadata_path) + job_ids.update(set(file_dict.keys())) + + # Take a random sample + random.seed(params["random_seed"]) + job_ids_sample = random.sample(job_ids, params["sample_size"]) + + del job_ids + + # It's quicker to query a set than a list + job_ids_sample_set = set(job_ids_sample) + + # Output a dict of the job ids sampled from each file. + # The nuance is that multiple files may have the same job id, + # so only include the first one when iterating through the files + # randomly + + sample_locs = defaultdict(list) + job_ids_seen = set() + random.seed(params["random_seed"]) + random.shuffle(tk_metadata_paths) + + for tk_metadata_path in tqdm(tk_metadata_paths): + file_dict = load_s3_data(s3, BUCKET_NAME, tk_metadata_path) + for job_id, file_name in file_dict.items(): + if (job_id in job_ids_sample_set) and (job_id not in job_ids_seen): + sample_locs[file_name].append(job_id) + job_ids_seen.add(job_id) + + print(sum([len(v) for v in sample_locs.values()]) == params["sample_size"]) + + save_to_s3( + s3, + BUCKET_NAME, + sample_locs, + os.path.join(params["output_dir"], "sample_file_locations.json"), + ) From 3ce5aee4a70bf5ac0c76526039ebe6abe464c64c Mon Sep 17 00:00:00 2001 From: lizgzil Date: Tue, 26 Oct 2021 17:35:52 +0100 Subject: [PATCH 02/11] New config for predict sentnce class --- .../config/predict_skill_sentences/2021.10.26.yaml | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 skills_taxonomy_v2/config/predict_skill_sentences/2021.10.26.yaml diff --git a/skills_taxonomy_v2/config/predict_skill_sentences/2021.10.26.yaml b/skills_taxonomy_v2/config/predict_skill_sentences/2021.10.26.yaml new file mode 100644 index 0000000..dfca405 --- /dev/null +++ b/skills_taxonomy_v2/config/predict_skill_sentences/2021.10.26.yaml @@ -0,0 +1,9 @@ +flows: + predict_skill_sentences_flow: + params: + input_dir: "inputs/data/" + data_dir: "textkernel-files/" + model_config_name: "2021.08.16" + output_dir: "outputs/sentence_classifier/data/skill_sentences" + sampled_data_loc: "outputs/tk_sample_data/sample_file_locations.json" + run_id: 0 From ff0640217c29e9670a0a4cd4647d93bd58cdd01c Mon Sep 17 00:00:00 2001 From: lizgzil Date: Tue, 26 Oct 2021 17:53:51 +0100 Subject: [PATCH 03/11] Remove testing change --- .../pipeline/sentence_classifier/predict_sentence_class.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/skills_taxonomy_v2/pipeline/sentence_classifier/predict_sentence_class.py b/skills_taxonomy_v2/pipeline/sentence_classifier/predict_sentence_class.py index aa01f42..30763fc 100644 --- a/skills_taxonomy_v2/pipeline/sentence_classifier/predict_sentence_class.py +++ b/skills_taxonomy_v2/pipeline/sentence_classifier/predict_sentence_class.py @@ -363,9 +363,6 @@ def run_predict_sentence_class_presample( if job_ad["job_id"] in job_ids_set ] - # CHANGE ==========================================!!!!!!!!!!!! - data = data[0:10] - if data: output_file_dir = get_output_name( data_path, input_dir, output_dir, model_config_name From 0982566aa1e23a6ca0b808c3c483db1ed73f17be Mon Sep 17 00:00:00 2001 From: lizgzil Date: Tue, 26 Oct 2021 18:03:09 +0100 Subject: [PATCH 04/11] config file for getting job advert sample --- skills_taxonomy_v2/config/tk_data_sample/2021.10.25.yaml | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 skills_taxonomy_v2/config/tk_data_sample/2021.10.25.yaml diff --git a/skills_taxonomy_v2/config/tk_data_sample/2021.10.25.yaml b/skills_taxonomy_v2/config/tk_data_sample/2021.10.25.yaml new file mode 100644 index 0000000..86f7ddc --- /dev/null +++ b/skills_taxonomy_v2/config/tk_data_sample/2021.10.25.yaml @@ -0,0 +1,7 @@ +flows: + get_tk_sample: + params: + tk_metadata_dir: "outputs/tk_data_analysis/metadata_file/" + sample_size: 5000000 + random_seed: 42 + output_dir: "outputs/tk_sample_data/" \ No newline at end of file From a10348ecd42dbf97d165fbc65e53d700c1f21966 Mon Sep 17 00:00:00 2001 From: lizgzil Date: Wed, 27 Oct 2021 17:30:12 +0100 Subject: [PATCH 05/11] Use nltk sentence splitter instead of nlp and include functions for chunking of data when it's being used --- .../pipeline/sentence_classifier/utils.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/skills_taxonomy_v2/pipeline/sentence_classifier/utils.py b/skills_taxonomy_v2/pipeline/sentence_classifier/utils.py index 556ba93..5cee2c4 100644 --- a/skills_taxonomy_v2/pipeline/sentence_classifier/utils.py +++ b/skills_taxonomy_v2/pipeline/sentence_classifier/utils.py @@ -17,7 +17,7 @@ import os import boto3 from s3fs.core import S3FileSystem -from functools import lru_cache +from functools import lru_cache, partial import nltk @@ -47,7 +47,7 @@ def text_cleaning(text): return text.lower() -def split_sentence(data, nlp, min_length=30): +def split_sentence(data, min_length=30): """ Split and clean one sentence. Output is two lists, a list of each sentence and a list of the job_ids they are from. @@ -60,15 +60,22 @@ def split_sentence(data, nlp, min_length=30): sentences = [] job_id = data.get("job_id") # Split up sentences - doc = nlp(text) - for sent in doc.sents: - sentence = text_cleaning(sent.text) + sents = nltk.sent_tokenize(text) + for sent in sents: + sentence = text_cleaning(sent) if len(sentence) > min_length: sentences.append(sentence) return job_id, sentences else: return None, None +def split_sentence_over_chunk(chunk, min_length): + partial_split_sentence = partial(split_sentence, min_length=min_length) + return list(map(partial_split_sentence, chunk)) + +def make_chunks(lst, n): + for i in range(0, len(lst), n): + yield lst[i:i + n] @lru_cache(maxsize=None) def load_training_data_from_s3(prefix="final_training_data"): From 11f75d5bf97bee7d41145dd5c933088108a1862e Mon Sep 17 00:00:00 2001 From: lizgzil Date: Wed, 27 Oct 2021 17:31:46 +0100 Subject: [PATCH 06/11] Add a batch_size parameter to BertVectorizer and SentenceClassifier --- .../sentence_classifier/sentence_classifier.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/skills_taxonomy_v2/pipeline/sentence_classifier/sentence_classifier.py b/skills_taxonomy_v2/pipeline/sentence_classifier/sentence_classifier.py index 4256c1c..46214a0 100644 --- a/skills_taxonomy_v2/pipeline/sentence_classifier/sentence_classifier.py +++ b/skills_taxonomy_v2/pipeline/sentence_classifier/sentence_classifier.py @@ -64,9 +64,11 @@ def __init__( self, bert_model_name="sentence-transformers/paraphrase-MiniLM-L6-v2", multi_process=True, + batch_size=32 ): self.bert_model_name = bert_model_name self.multi_process = multi_process + self.batch_size = batch_size def fit(self, *_): self.bert_model = SentenceTransformer(self.bert_model_name) @@ -79,7 +81,7 @@ def transform(self, texts): if self.multi_process: print(".. with multiprocessing") pool = self.bert_model.start_multi_process_pool() - self.embedded_x = self.bert_model.encode_multi_process(texts, pool) + self.embedded_x = self.bert_model.encode_multi_process(texts, pool, batch_size=self.batch_size) self.bert_model.stop_multi_process_pool(pool) else: self.embedded_x = self.bert_model.encode(texts, show_progress_bar=True) @@ -126,6 +128,7 @@ def __init__( test_size=0.15, bert_model_name="sentence-transformers/paraphrase-MiniLM-L6-v2", multi_process=True, + batch_size=32, max_depth=7, min_child_weight=1, gamma=0.0, @@ -144,6 +147,7 @@ def __init__( self.bert_model_name = bert_model_name self.bert_model_name = bert_model_name self.multi_process = multi_process + self.batch_size = batch_size self.max_depth = max_depth self.min_child_weight = min_child_weight self.gamma = gamma @@ -195,7 +199,9 @@ def split_data(self, training_data, verbose=False): def load_bert(self): self.bert_vectorizer = BertVectorizer( - bert_model_name=self.bert_model_name, multi_process=self.multi_process + bert_model_name=self.bert_model_name, + multi_process=self.multi_process, + batch_size=self.batch_size ) self.bert_vectorizer.fit() From 50924a9cde43fdf5e147ef93b3e27471d1c76086 Mon Sep 17 00:00:00 2001 From: lizgzil Date: Wed, 27 Oct 2021 17:33:22 +0100 Subject: [PATCH 07/11] Use a multiprocess and batch size optional parameters and chunk up data for splitting, change to 27.yaml as default --- .../predict_sentence_class.py | 44 ++++++++++++------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/skills_taxonomy_v2/pipeline/sentence_classifier/predict_sentence_class.py b/skills_taxonomy_v2/pipeline/sentence_classifier/predict_sentence_class.py index 30763fc..b1ef197 100644 --- a/skills_taxonomy_v2/pipeline/sentence_classifier/predict_sentence_class.py +++ b/skills_taxonomy_v2/pipeline/sentence_classifier/predict_sentence_class.py @@ -37,7 +37,7 @@ SentenceClassifier, ) -from skills_taxonomy_v2.pipeline.sentence_classifier.utils import split_sentence +from skills_taxonomy_v2.pipeline.sentence_classifier.utils import split_sentence, make_chunks, split_sentence_over_chunk from skills_taxonomy_v2 import PROJECT_DIR, BUCKET_NAME @@ -84,7 +84,7 @@ def load_s3_text_data(file_name, s3): return data -def load_model(config_name): # change this to be in s3! +def load_model(config_name, multi_process=None, batch_size=32): # change this to be in s3! # Load sentence classifier trained model and config it came with # Be careful here if you change output locations in sentence_classifier.py @@ -94,14 +94,18 @@ def load_model(config_name): # change this to be in s3! with open(config_dir, "r") as f: config = yaml.load(f, Loader=yaml.FullLoader) + if multi_process is None: + multi_process = config["flows"]["sentence_classifier_flow"]["params"][ + "multi_process" + ] + # Loading the model sent_classifier = SentenceClassifier( bert_model_name=config["flows"]["sentence_classifier_flow"]["params"][ "bert_model_name" ], - multi_process=config["flows"]["sentence_classifier_flow"]["params"][ - "multi_process" - ], + multi_process=multi_process, + batch_size=batch_size, max_depth=config["flows"]["sentence_classifier_flow"]["params"]["max_depth"], min_child_weight=config["flows"]["sentence_classifier_flow"]["params"][ "min_child_weight" @@ -338,7 +342,7 @@ def run_predict_sentence_class_presample( in get_tk_sample.py and save out for every relevant file in the dir. """ - sent_classifier, _ = load_model(model_config_name) + sent_classifier, _ = load_model(model_config_name, multi_process=True, batch_size=64) nlp = spacy.load("en_core_web_sm") s3 = boto3.resource("s3") @@ -367,22 +371,32 @@ def run_predict_sentence_class_presample( output_file_dir = get_output_name( data_path, input_dir, output_dir, model_config_name ) + # # If this file already exists don't re-run it (since you are using a different model to split sentences, you prob want to process all from the beginning) + # try: + # exist_test = load_s3_data(s3, BUCKET_NAME, output_file_dir) + # del exist_test + # print("Already created") + # except: + # print("Not created yet") logger.info(f"Splitting sentences ...") start_time = time.time() with Pool(4) as pool: # 4 cpus - partial_split_sentence = partial(split_sentence, nlp=nlp, min_length=30) - split_sentence_pool_output = pool.map(partial_split_sentence, data) + chunks = make_chunks(data, 1000) # chunks of 1000s sentences + partial_split_sentence = partial(split_sentence_over_chunk, min_length=30) + # NB the output will be a list of lists, so make sure to flatten after this! + split_sentence_pool_output = pool.map(partial_split_sentence, chunks) logger.info(f"Splitting sentences took {time.time() - start_time} seconds") - # Process output into one list of sentences for all documents + # Flatten and process output into one list of sentences for all documents + start_time = time.time() sentences = [] job_ids = [] - for i, (job_id, s) in enumerate(split_sentence_pool_output): - if s: - sentences += s - job_ids += [job_id] * len(s) - + for chunk_split_sentence_pool_output in split_sentence_pool_output: + for job_id, s in chunk_split_sentence_pool_output: + if s: + sentences += s + job_ids += [job_id] * len(s) logger.info(f"Processing sentences took {time.time() - start_time} seconds") if sentences: @@ -444,7 +458,7 @@ def parse_arguments(parser): parser.add_argument( "--config_path", help="Path to config file", - default="skills_taxonomy_v2/config/predict_skill_sentences/2021.10.26.yaml", + default="skills_taxonomy_v2/config/predict_skill_sentences/2021.10.27.yaml", ) args, unknown = parser.parse_known_args() From 38876ee86b21adc2908a129f8b5fc04aec6b4124 Mon Sep 17 00:00:00 2001 From: lizgzil Date: Wed, 27 Oct 2021 17:33:54 +0100 Subject: [PATCH 08/11] Add new config for predict skill sentences --- .../config/predict_skill_sentences/2021.10.27.yaml | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 skills_taxonomy_v2/config/predict_skill_sentences/2021.10.27.yaml diff --git a/skills_taxonomy_v2/config/predict_skill_sentences/2021.10.27.yaml b/skills_taxonomy_v2/config/predict_skill_sentences/2021.10.27.yaml new file mode 100644 index 0000000..dfca405 --- /dev/null +++ b/skills_taxonomy_v2/config/predict_skill_sentences/2021.10.27.yaml @@ -0,0 +1,9 @@ +flows: + predict_skill_sentences_flow: + params: + input_dir: "inputs/data/" + data_dir: "textkernel-files/" + model_config_name: "2021.08.16" + output_dir: "outputs/sentence_classifier/data/skill_sentences" + sampled_data_loc: "outputs/tk_sample_data/sample_file_locations.json" + run_id: 0 From ea0ca6070458610b4207421d26279fbf019a8429 Mon Sep 17 00:00:00 2001 From: lizgzil Date: Fri, 5 Nov 2021 15:48:02 +0000 Subject: [PATCH 09/11] Add information about the data sampling and the new run of predict skill sentences to the relevant READMEs --- README.md | 7 ++++--- .../pipeline/sentence_classifier/README.md | 12 ++++++++++-- .../pipeline/tk_data_analysis/README.md | 17 +++++++++++++++++ 3 files changed, 31 insertions(+), 5 deletions(-) create mode 100644 skills_taxonomy_v2/pipeline/tk_data_analysis/README.md diff --git a/README.md b/README.md index b4a366e..2213a81 100644 --- a/README.md +++ b/README.md @@ -14,9 +14,10 @@ An overview of the methodology, coloured by the three main steps to the pipeline More details of the steps included in this project, and running instructions, can be found in their respective READMEs: -1. [sentence_classifier](skills_taxonomy_v2/pipeline/sentence_classifier/README.md) - Training a classifier to predict skill sentences. -2. [skills_extraction](skills_taxonomy_v2/pipeline/skills_extraction/README.md) - Extracting skills from skill sentences. -3. [skills_taxonomy](skills_taxonomy_v2/pipeline/skills_taxonomy/README.md) - Building the skills taxonomy from extracted skills. +1. [tk_data_analysis](skills_taxonomy_v2/pipeline/tk_data_analysis/README.md) - Get a sample of the TextKernel job adverts. +2. [sentence_classifier](skills_taxonomy_v2/pipeline/sentence_classifier/README.md) - Training a classifier to predict skill sentences. +3. [skills_extraction](skills_taxonomy_v2/pipeline/skills_extraction/README.md) - Extracting skills from skill sentences. +4. [skills_taxonomy](skills_taxonomy_v2/pipeline/skills_taxonomy/README.md) - Building the skills taxonomy from extracted skills. ### Analysis diff --git a/skills_taxonomy_v2/pipeline/sentence_classifier/README.md b/skills_taxonomy_v2/pipeline/sentence_classifier/README.md index 9a4c313..0f76472 100644 --- a/skills_taxonomy_v2/pipeline/sentence_classifier/README.md +++ b/skills_taxonomy_v2/pipeline/sentence_classifier/README.md @@ -225,12 +225,20 @@ with the most recent config file will take in job adverts, split them into sente {'job_id_1': [('sentence1'), ('sentence2')], 'job_id_2': [('sentence1'), ('sentence2'} ``` -To predict on all job adverts in the TextKernel data on S3, on the EC2 instance I ran +### `2021.10.27.yaml` config file: + +When using the `2021.10.27.yaml` config file skill sentences are predicted on a pre-determined sample of 5 million job adverts (found via running `get_tk_sample.py`). This was run using: ``` -python skills_taxonomy_v2/pipeline/sentence_classifier/predict_sentence_class.py --config_path 'skills_taxonomy_v2/config/predict_skill_sentences/2021.08.16.yaml' +python skills_taxonomy_v2/pipeline/sentence_classifier/predict_sentence_class.py --config_path 'skills_taxonomy_v2/config/predict_skill_sentences/2021.10.27.yaml' ``` +Skill sentences for each job advert and file are stored in `outputs/sentence_classifier/data/skill_sentences/2021.10.27/`. + +Out of the 647 files of job adverts, 516 had skill sentences in. This is because the 'jobs_expired' files were included in the sample and these don't contain the job advert text. This leaves us with a sample of 4,312,285 job adverts. + + + ### From `2021.07.09.yaml`: This will run predictions on a random sample of 10 of the 686 data files. The outputs of this yielded 5,823,903 skill sentences from the 1,000,000 job adverts. diff --git a/skills_taxonomy_v2/pipeline/tk_data_analysis/README.md b/skills_taxonomy_v2/pipeline/tk_data_analysis/README.md new file mode 100644 index 0000000..7977b2e --- /dev/null +++ b/skills_taxonomy_v2/pipeline/tk_data_analysis/README.md @@ -0,0 +1,17 @@ +# TK data sample + +In the first step of finding a skills taxonomy from job adverts we take a sample of the job advert data. + +This can be done by running: + +``` +python skills_taxonomy_v2/pipeline/tk_data_analysis/get_tk_sample.py --config_path skills_taxonomy_v2/config/tk_data_sample/2021.10.25.yaml +``` + +## `2021.10.25.yaml` config file + +This samples 5,000,000 job adverts randomly from all the TextKernel files. This sample was further reduced to 4,312,285 job adverts since some of the sample included job adverts which don't have the full text field available. + +The output is a dict of each TextKernel file name and a list of the job ids within it which are included in the sample. e.g. `{"historical/...0.json": ['6001f8701aeb4072a8eb0cca85535208', ...]}`. This then provides an easy way to open the original file and get the text from each of the job adverts included in the sample. + +This output is saved in `outputs/tk_sample_data/sample_file_locations.json`. From 1b79e3c536c00e61f4b42f85402f8911d62d64f1 Mon Sep 17 00:00:00 2001 From: lizgzil Date: Fri, 5 Nov 2021 15:48:42 +0000 Subject: [PATCH 10/11] change batch size back to 32 --- .../pipeline/sentence_classifier/predict_sentence_class.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/skills_taxonomy_v2/pipeline/sentence_classifier/predict_sentence_class.py b/skills_taxonomy_v2/pipeline/sentence_classifier/predict_sentence_class.py index b1ef197..8dcf9b5 100644 --- a/skills_taxonomy_v2/pipeline/sentence_classifier/predict_sentence_class.py +++ b/skills_taxonomy_v2/pipeline/sentence_classifier/predict_sentence_class.py @@ -342,7 +342,7 @@ def run_predict_sentence_class_presample( in get_tk_sample.py and save out for every relevant file in the dir. """ - sent_classifier, _ = load_model(model_config_name, multi_process=True, batch_size=64) + sent_classifier, _ = load_model(model_config_name, multi_process=True, batch_size=32) nlp = spacy.load("en_core_web_sm") s3 = boto3.resource("s3") From c621c22176494738be482e681bf3def9f45274e1 Mon Sep 17 00:00:00 2001 From: lizgzil Date: Fri, 5 Nov 2021 16:54:08 +0000 Subject: [PATCH 11/11] Outputs plots of data sample proportion comparisons --- .../TextKernel Sample - 25th Oct 2021.py | 85 +++++++++++++++++-- 1 file changed, 79 insertions(+), 6 deletions(-) diff --git a/skills_taxonomy_v2/analysis/tk_analysis/TextKernel Sample - 25th Oct 2021.py b/skills_taxonomy_v2/analysis/tk_analysis/TextKernel Sample - 25th Oct 2021.py index 5b73629..2780708 100644 --- a/skills_taxonomy_v2/analysis/tk_analysis/TextKernel Sample - 25th Oct 2021.py +++ b/skills_taxonomy_v2/analysis/tk_analysis/TextKernel Sample - 25th Oct 2021.py @@ -65,11 +65,8 @@ else: job_ads_date_count["No date given"] += 1 -# %% -sum(job_ads_date_count.values()) == total_n_job_adverts - # %% [markdown] -# ### Dates for the sample +# ### Import the sample and get some stats on it # %% sample_dict = load_s3_data( @@ -77,10 +74,21 @@ ) # %% -sample_dict["historical/2020/2020-03-11/jobs_0.0.jsonl.gz"][0:10] +len(sample_dict) + +# %% +plt.hist([len(v) for k,v in sample_dict.items()], bins =30); +plt.title("Number of job adverts in each file of sample"); + +# %% +plt.hist([len(v) for k,v in sample_dict.items() if "jobs_expired" not in k], bins =30); +plt.title("Number of job adverts in each file of sample from not 'jobs_expired' files"); # %% -sum([len(v) for v in sample_dict.values()]) +print(f"There are {sum([len(v) for k,v in sample_dict.items() if 'jobs_expired' not in k])} job adverts in the sample which aren't from the 'jobs_expired' files (which don't have full text available in the metadata)") + +# %% [markdown] +# ### Dates for the sample # %% job_ads_date_count_sample = defaultdict(int) @@ -135,5 +143,70 @@ def find_num_dates(count_dict): plt.legend() plt.xlabel("Date of job advert (2014 = no date given)") plt.ylabel("Proportion") +plt.savefig("../../../outputs/tk_analysis/tk_sample_dates.pdf") + +# %% [markdown] +# ### In comparison to sample without the jobs expired + +# %% +# All the job ids from the files with "jobs_expired" in their name +jobs_expired_job_ids = set() +for file_name in tqdm(range(0, 13)): + file_date_dict = load_s3_data( + s3, bucket_name, f"outputs/tk_data_analysis/metadata_file/{file_name}.json" + ) + jobs_expired_job_ids.update(set({k for k, f in file_date_dict.items() if 'jobs_expired' in f})) + +print(len(jobs_expired_job_ids)) + +# %% +job_ads_date_count_notexpired = defaultdict(int) + +for k, v in tqdm(tk_dates.items()): + if k not in jobs_expired_job_ids: + if v: + date = v[0:7] + job_ads_date_count_notexpired[date] += 1 + else: + job_ads_date_count_notexpired["No date given"] += 1 + +# %% +job_ads_date_count_sample_notexpired = defaultdict(int) +for file_name, job_id_list in tqdm(sample_dict.items()): + for job_id in job_id_list: + if job_id not in jobs_expired_job_ids: + v = tk_dates.get(job_id) + if v: + date = v[0:7] + job_ads_date_count_sample_notexpired[date] += 1 + else: + job_ads_date_count_sample_notexpired["No date given"] += 1 + +# %% +num_dates_notexpired = find_num_dates(job_ads_date_count_notexpired) +num_dates_sample_notexpired = find_num_dates(job_ads_date_count_sample_notexpired) + +# %% +plt.figure(figsize=(10, 4)) +plt.bar( + num_dates_notexpired.keys(), + [v / sum(num_dates_notexpired.values()) for v in num_dates_notexpired.values()], + width=0.1, + alpha=0.5, + label="All data", +) +plt.bar( + num_dates_sample_notexpired.keys(), + [v / sum(num_dates_sample_notexpired.values()) for v in num_dates_sample_notexpired.values()], + width=0.1, + color="red", + alpha=0.5, + label="Sample of data", +) +plt.legend() +plt.title("Comparison not including the expired data files") +plt.xlabel("Date of job advert (2014 = no date given)") +plt.ylabel("Proportion") +plt.savefig("../../../outputs/tk_analysis/tk_sample_dates_no_expired.pdf") # %%