Skip to content

Commit

Permalink
Make webpages() consistent across aut and ARCH. (#539)
Browse files Browse the repository at this point in the history
- Filter HTTP headers, and HTML from content on webpages so that it is consistent with the app implementation, and the ARCH implementation
- Change all content to raw_content.
- Update PlainTextExtractor to use .all() since HTML is removed from content
- Add domain to all()
- Update csv exports on app so that they are rfc4180 compliant
- Apply GitHub workflows to main branch
- Consistent formatting on DataFrameLoader.scala
- Update tests as needed
-  Update Apache Spark version in README.
- Resolves #538
  • Loading branch information
ruebot authored May 30, 2022
1 parent 988f70f commit 9011c92
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 71 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/mvn-build.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
name: Maven build

on:
push:
branches: [main]
pull_request:
branches: [main]

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ If you would like a more in-depth look at the project, please check out the foll
- Java 11
- Python 3.7.3+ (PySpark)
- Scala 2.12+
- Apache Spark 3.0.0+
- Apache Spark (Hadoop 2.7) 3.0.3+

More information on setting up dependencies can be found [here](https://aut.docs.archivesunleashed.org/docs/next/dependencies).

Expand Down
16 changes: 11 additions & 5 deletions src/main/scala/io/archivesunleashed/app/CommandLineApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.nio.file.{Files, Paths}

import io.archivesunleashed.{ArchiveRecord, RecordLoader}
import org.apache.log4j.Logger
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.{SparkConf, SparkContext}
import org.rogach.scallop.exceptions.ScallopException
import org.rogach.scallop.ScallopConf
Expand Down Expand Up @@ -209,9 +209,9 @@ class CommandLineApp(conf: CmdAppConf) {
"PlainTextExtractor" ->
((inputFiles: List[String]) => {
var df =
RecordLoader.loadArchives(inputFiles.head, sparkCtx.get).webpages()
RecordLoader.loadArchives(inputFiles.head, sparkCtx.get).all()
inputFiles.tail foreach { f =>
df = df.union(RecordLoader.loadArchives(f, sparkCtx.get).webpages())
df = df.union(RecordLoader.loadArchives(f, sparkCtx.get).all())
}
if (!configuration.outputFormat.isEmpty && configuration
.outputFormat() == "parquet") {
Expand Down Expand Up @@ -327,12 +327,18 @@ class CommandLineApp(conf: CmdAppConf) {
d.coalesce(configuration.partition())
.write
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.format("csv")
.option("header", "true")
.csv(saveTarget)
.option("escape", "\"")
.option("encoding", "utf-8")
.save(saveTarget)
} else {
d.write
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.csv(saveTarget)
.format("csv")
.option("escape", "\"")
.option("encoding", "utf-8")
.save(saveTarget)
}
}

Expand Down
18 changes: 16 additions & 2 deletions src/main/scala/io/archivesunleashed/app/PlainTextExtractor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,33 @@ package io.archivesunleashed.app
import io.archivesunleashed.ArchiveRecord
import io.archivesunleashed.udfs.{extractBoilerpipeText}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions.lower

object PlainTextExtractor {

/** Extract plain text from web archive using DataFrame and Spark SQL.
*
* @param d DataFrame obtained from RecordLoader
* @return Dataset[Row], where the schema is (crawl date, domain, url, text)
* @return Dataset[Row], where the schema is (content)
*/
def apply(d: DataFrame): Dataset[Row] = {
val spark = SparkSession.builder().master("local").getOrCreate()
// scalastyle:off
import spark.implicits._
// scalastyle:on
d.select(extractBoilerpipeText($"content").as("content"))
d.filter($"crawl_date" isNotNull)
.filter(
!($"url".rlike(".*robots\\.txt$")) &&
($"mime_type_web_server".rlike("text/html") ||
$"mime_type_web_server".rlike("application/xhtml+xml") ||
$"url".rlike("(?i).*htm$") ||
$"url".rlike("(?i).*html$"))
)
.filter($"http_status_code" === 200)
.filter(
!(lower($"url").startsWith("filedesc:"))
&& (!(lower($"url").startsWith("dns:")))
)
.select(extractBoilerpipeText($"raw_content").as("content"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ object WebPagesExtractor {
// scalastyle:on
d.select(
$"crawl_date",
removePrefixWWW(extractDomain($"url")).as("domain"),
$"domain",
$"url",
$"mime_type_web_server",
$"mime_type_tika",
$"language",
removeHTML(removeHTTPHeader(($"content"))).alias("content")
$"content"
)
}
}
24 changes: 18 additions & 6 deletions src/main/scala/io/archivesunleashed/df/DataFrameLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ class DataFrameLoader(sc: SparkContext) {

/** Create a DataFrame with audio url, filename, extension, mime_type_web_server, mime_type_tika, md5, sha1, and raw bytes. */
def audio(path: String): DataFrame = {
RecordLoader.loadArchives(path, sc).audio
RecordLoader
.loadArchives(path, sc)
.audio()
}

/* Create a DataFrame with crawl date, source page, image url, and alt text. */
Expand All @@ -51,22 +53,30 @@ class DataFrameLoader(sc: SparkContext) {

/** Create a DataFrame with PDF url, filename, extension, mime_type_web_server, mime_type_tika, md5, sha1, and raw bytes. */
def pdfs(path: String): DataFrame = {
RecordLoader.loadArchives(path, sc).pdfs
RecordLoader
.loadArchives(path, sc)
.pdfs()
}

/** Create a DataFrame with presentation program file url, filename, extension, mime_type_web_server, mime_type_tika, md5, sha1, and raw bytes. */
def presentationProgramFiles(path: String): DataFrame = {
RecordLoader.loadArchives(path, sc).presentationProgramFiles
RecordLoader
.loadArchives(path, sc)
.presentationProgramFiles()
}

/** Create a DataFrame with spreadsheet url, filename, extension, mime_type_web_server, mime_type_tika, md5, sha1, and raw bytes. */
def spreadsheets(path: String): DataFrame = {
RecordLoader.loadArchives(path, sc).spreadsheets
RecordLoader
.loadArchives(path, sc)
.spreadsheets()
}

/** Create a DataFrame with video url, filename, extension, mime_type_web_server, mime_type_tika, md5, sha1, and raw bytes. */
def videos(path: String): DataFrame = {
RecordLoader.loadArchives(path, sc).videos
RecordLoader
.loadArchives(path, sc)
.videos()
}

/** Create a DataFrame with crawl_date, source, destination, and anchor. */
Expand All @@ -85,6 +95,8 @@ class DataFrameLoader(sc: SparkContext) {

/** Create a DataFrame with word processor file url, filename, extension, mime_type_web_server, mime_type_tika, md5, sha1, and raw bytes. */
def wordProcessorFiles(path: String): DataFrame = {
RecordLoader.loadArchives(path, sc).wordProcessorFiles
RecordLoader
.loadArchives(path, sc)
.wordProcessorFiles()
}
}
8 changes: 5 additions & 3 deletions src/main/scala/io/archivesunleashed/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.commons.codec.binary.Hex
import org.apache.commons.io.FilenameUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.{lit, udf}
import org.apache.spark.sql.functions.{lit, lower, udf}
import org.apache.spark.sql.types.{
BinaryType,
IntegerType,
Expand Down Expand Up @@ -170,6 +170,7 @@ package object archivesunleashed {
.map(r =>
Row(
r.getCrawlDate,
ExtractDomain(r.getUrl).replaceAll("^\\s*www\\.", ""),
r.getUrl,
r.getMimeType,
DetectMimeTypeTika(r.getBinaryBytes),
Expand All @@ -182,10 +183,11 @@ package object archivesunleashed {

val schema = new StructType()
.add(StructField("crawl_date", StringType, true))
.add(StructField("domain", StringType, true))
.add(StructField("url", StringType, true))
.add(StructField("mime_type_web_server", StringType, true))
.add(StructField("mime_type_tika", StringType, true))
.add(StructField("content", StringType, true))
.add(StructField("raw_content", StringType, true))
.add(StructField("bytes", BinaryType, true))
.add(StructField("http_status_code", StringType, true))
.add(StructField("archive_filename", StringType, true))
Expand Down Expand Up @@ -228,7 +230,7 @@ package object archivesunleashed {
r.getMimeType,
DetectMimeTypeTika(r.getBinaryBytes),
DetectLanguage(RemoveHTML(RemoveHTTPHeader(r.getContentString))),
r.getContentString
RemoveHTML(RemoveHTTPHeader(r.getContentString))
)
)

Expand Down
14 changes: 8 additions & 6 deletions src/test/scala/io/archivesunleashed/RecordDFTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import io.archivesunleashed.udfs.{
}
import com.google.common.io.Resources
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
Expand All @@ -61,7 +61,7 @@ class RecordDFTest extends FunSuite with BeforeAndAfter {
.loadArchives(arcPath, sc)
.all()
.keepValidPagesDF()
.take(1)(0)(1)
.take(2)(0)(2)

assert(base.toString == expected)
}
Expand Down Expand Up @@ -180,8 +180,10 @@ class RecordDFTest extends FunSuite with BeforeAndAfter {
val base = RecordLoader
.loadArchives(arcPath, sc)
.all()
.select($"url", $"content")
.filter(hasContent($"content", lit(Array("Content-Length: [0-9]{4}"))))
.select($"url", $"raw_content")
.filter(
hasContent($"raw_content", lit(Array("Content-Length: [0-9]{4}")))
)
.take(1)(0)(0)

assert(base.toString == expected)
Expand Down Expand Up @@ -223,10 +225,10 @@ class RecordDFTest extends FunSuite with BeforeAndAfter {
val base = RecordLoader
.loadArchives(arcPath, sc)
.all()
.select(detectLanguage(removeHTML($"content")).as("language"))
.select(detectLanguage(removeHTML($"raw_content")).as("language"))
.filter(
hasLanguages(
detectLanguage(removeHTML($"content")),
detectLanguage(removeHTML($"raw_content")),
lit(Array("de", "ht"))
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class PlainTextExtractorTest extends FunSuite with BeforeAndAfter {
}

test("Plain text extractor") {
val df = RecordLoader.loadArchives(arcPath, sc).webpages()
val df = RecordLoader.loadArchives(arcPath, sc).all()
val dfResults = PlainTextExtractor(df).collect()
val RESULTSLENGTH = 94

Expand Down
Loading

0 comments on commit 9011c92

Please sign in to comment.