Skip to content

Commit

Permalink
非常非常关键的改变!
Browse files Browse the repository at this point in the history
  • Loading branch information
confucianzuoyuan committed May 25, 2020
1 parent d5f33c0 commit dee302d
Show file tree
Hide file tree
Showing 23 changed files with 9,864 additions and 11,805 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.atguigu.course

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat

// 需求:每个用户的点击Join这个用户最近10分钟内的浏览

// 数据流clickStream
// 某个用户在某个时刻点击了某个页面
// {"userID": "user_2", "eventTime": "2019-11-16 17:30:02", "eventType": "click", "pageID": "page_1"}

// 数据流browseStream
// 某个用户在某个时刻浏览了某个商品,以及商品的价值
// {"userID": "user_2", "eventTime": "2019-11-16 17:30:01", "eventType": "browse", "productID": "product_1", "productPrice": 10}
object IntervalJoinExample {

case class UserClickLog(userID: String,
eventTime: String,
eventType: String,
pageID: String)

case class UserBrowseLog(userID: String,
eventTime: String,
eventType: String,
productID: String,
productPrice: String)

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val clickStream = env
.fromElements(
UserClickLog("user_2", "2019-11-16 17:30:00", "click", "page_1")
)
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[UserClickLog](Time.seconds(0)) {
override def extractTimestamp(t: UserClickLog): Long = {
val dateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
val dateTime = DateTime.parse(t.eventTime, dateTimeFormatter)
dateTime.getMillis
}
}
)

val browseStream = env
.fromElements(
UserBrowseLog("user_2", "2019-11-16 17:19:00", "browse", "product_1", "10"),
UserBrowseLog("user_2", "2019-11-16 17:20:00", "browse", "product_1", "10"),
UserBrowseLog("user_2", "2019-11-16 17:22:00", "browse", "product_1", "10"),
UserBrowseLog("user_2", "2019-11-16 17:26:00", "browse", "product_1", "10"),
UserBrowseLog("user_2", "2019-11-16 17:30:00", "browse", "product_1", "10"),
UserBrowseLog("user_2", "2019-11-16 17:31:00", "browse", "product_1", "10")
)
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[UserBrowseLog](Time.seconds(0)) {
override def extractTimestamp(t: UserBrowseLog): Long = {
val dateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
val dateTime = DateTime.parse(t.eventTime, dateTimeFormatter)
dateTime.getMillis
}
}
)

clickStream
.keyBy("userID")
.intervalJoin(browseStream.keyBy("userID"))
.between(Time.minutes(-10),Time.seconds(0))
.process(new MyIntervalJoin)
.print()

env.execute()
}

class MyIntervalJoin extends ProcessJoinFunction[UserClickLog, UserBrowseLog, String] {
override def processElement(left: UserClickLog,
right: UserBrowseLog,
context: ProcessJoinFunction[UserClickLog, UserBrowseLog, String]#Context,
out: Collector[String]): Unit = {
out.collect(left +" =Interval Join=> "+right)
}
}
}
Binary file added bin/__pycache__/preproc_config.cpython-37.pyc
Binary file not shown.
83 changes: 83 additions & 0 deletions bin/preproc
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#!/usr/bin/env python3

"""
Concatenates a number of input files into a single output file, while
performing the following regex substitutions:
[[pagebreak]]
[nobr[s]] # Substitute spaces with `\ ` to mark as nonbreaking
# Doesn't work inside code markdown, alas
[nh[x]] \hyphenation{x} # no hyphen, no underscores allowed
[ix[x]] \index{x} # index straight up
[ixtt[x]] \index{x@\texttt{x}} # index tt
fl = footnote link
flx = footnote link to example https://beej.us/guide/bgnet/examples/file
flr = footnote link to redirect https://beej.us/guide/url/id
[fl[link|url]] [link](url)^[url]
[flx[link|file]] [link](https://beej.us/guide/bgnet/examples/file)^[https://beej.us/guide/bgnet/examples/file]
[flr[link|id]] [link](https://beej.us/guide/url/id)^[https://beej.us/guide/url/id]
[flrfc[link|num]] [link](https://tools.ietf.org/html/rfcnum)^[https://tools.ietf.org/html/rfcnum]
Also puts a blank line between files.
"""

import sys
import re
import preproc_config

if len(sys.argv) < 3:
print("usage: preproc infile [infile ... ] outputfile", file=sys.stdout)
sys.exit(1)

infiles = sys.argv[1:-1]
outfile = sys.argv[-1]

filedata = []

def nobr_replace(mo):
return re.sub(r'\s', r'\ ', mo.group(1))

for infile in infiles:
fin = open(infile)
filedata.append(fin.read())
fin.close()

filedata = '\n'.join(filedata)

filedata = re.sub(r'\t', " ", filedata, flags=re.DOTALL)
filedata = re.sub(r'\[nobr\[(.+?)\]\]', nobr_replace, filedata, flags=re.DOTALL)
filedata = re.sub(r'\[\[pagebreak\]\]', r'\\newpage', filedata, flags=re.DOTALL)
filedata = re.sub(r'\[nh\[(.+?)\]\]', r'\\hyphenation{\1}', filedata, flags=re.DOTALL)
filedata = re.sub(r'\[ix\[(.+?)\]\]', r'\\index{\1}', filedata, flags=re.DOTALL)
filedata = re.sub(r'\[ixtt\[(.+?)\]\]', r'\\index{\1@\\texttt{\1}}', filedata, flags=re.DOTALL)
filedata = re.sub(r'\[fl\[(.+?)\|(.+?)\]\]', r'[\1](\2)^[\2]', filedata, flags=re.DOTALL)
filedata = re.sub(r'\[flx\[(.+?)\|(.+?)\]\]', r'[\1](' + preproc_config.EXAMPLE_URL + r'\2)^[' + preproc_config.EXAMPLE_URL + r'\2]', filedata, flags=re.DOTALL)
filedata = re.sub(r'\[flr\[(.+?)\|(.+?)\]\]', r'[\1](https://beej.us/guide/url/\2)^[https://beej.us/guide/url/\2]', filedata, flags=re.DOTALL)
filedata = re.sub(r'\[flrfc\[(.+?)\|(.+?)\]\]', r'[\1](https://tools.ietf.org/html/rfc\2)^[https://tools.ietf.org/html/rfc\2]', filedata, flags=re.DOTALL)

fout = open(outfile, "w")
in_fence = False
this_line_fence = False
number_lines = False

# Go through a line at a time indenting if we're in unnumbered fenced code
for line in filedata.splitlines(True):
if line.strip()[:3] == '```':
number_lines = line.lower().find("numberlines") != -1
this_line_fence = True
in_fence = not in_fence
else:
this_line_fence = False

if in_fence and not this_line_fence and not number_lines:
fout.write(" ") # indent

fout.write(line)

fout.close()

1 change: 1 addition & 0 deletions bin/preproc_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
EXAMPLE_URL = "https://beej.us/guide/bgnet/examples/"
174 changes: 174 additions & 0 deletions docs/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
TITLE="尚硅谷Flink教程"
SUBTITLE="Flink理论与项目实践"
AUTHOR='尚硅谷大数据教学组'
VERSION_DATE="v3.1.2, Copyright © November 13, 2019"

GUIDE_ID=flinktutorial

PDF_MAINFONT="Liberation Serif"
PDF_SANSFONT="Liberation Sans"
PDF_MONOFONT="Liberation Mono"
#PDF_MAINFONT="DejaVu Serif"
#PDF_SANSFONT="DejaVu Sans"
#PDF_MONOFONT="DejaVu Sans Mono"

USLETTER_COLOR=$(GUIDE_ID)_usl_c_1.pdf $(GUIDE_ID)_usl_c_2.pdf
USLETTER_BW=$(GUIDE_ID)_usl_bw_1.pdf $(GUIDE_ID)_usl_bw_2.pdf
A4_COLOR=$(GUIDE_ID)_a4_c_1.pdf $(GUIDE_ID)_a4_c_2.pdf
A4_BW=$(GUIDE_ID)_a4_bw_1.pdf $(GUIDE_ID)_a4_bw_2.pdf
BOOKS=$(A4_COLOR)

HTML=$(GUIDE_ID).html

PREPROC=../bin/preproc
PREPROC_MD=$(GUIDE_ID)_temp_preproc.md

COMMON_OPTS= \
--variable title:$(TITLE) \
--variable subtitle:$(SUBTITLE) \
--variable author:$(AUTHOR) \
--variable date:$(VERSION_DATE) \
--toc

PDF_OPTS= \
-H latex/header_index.latex \
-A latex/after_index.latex \
--pdf-engine=xelatex \
--variable mainfont=$(PDF_MAINFONT) \
--variable sansfont=$(PDF_SANSFONT) \
--variable monofont=$(PDF_MONOFONT) \
--variable geometry:"top=1in,bottom=1in" \
-V documentclass=ctexbook \
-o $(GUIDE_ID)_temp.tex \
$(COMMON_OPTS)

HTML_OPTS=$(COMMON_OPTS) \
--metadata title:$(TITLE)

ONESIDE=--variable classoption:oneside
TWOSIDE=--variable classoption:twoside
USLETTER=--variable papersize:letter
A4=--variable papersize:a4
CROWNQUARTO=--variable geometry:"paperwidth=7.444in,paperheight=9.681in,top=1in,bottom=1in,left=1in,right=1.5in" # Lulu press
CROWNQUARTO_AMAZON=--variable geometry:"paperwidth=7.444in,paperheight=9.681in,top=1in,bottom=1in,left=1.25in,right=1.25in" # Amazon
#SIZE_75x925_AMAZON=--variable geometry:"paperwidth=7.5in,paperheight=9.25in,top=1in,bottom=1in,left=1.125in,right=1.375in" # Amazon 7.5" x 9.25", margins too far inside
SIZE_75x925_AMAZON=--variable geometry:"paperwidth=7.5in,paperheight=9.25in,top=1in,bottom=1in,left=1.25in,right=1.25in" # Amazon 7.5" x 9.25"
BLANKLAST=-A latex/after_blank.latex # add a blank last page
BW=--no-highlight # black and white options

all: $(HTML) $(BOOKS)

$(GUIDE_ID).html: $(GUIDE_ID).md bg-css.html
$(PREPROC) $< $(PREPROC_MD)
pandoc $(HTML_OPTS) -s $(PREPROC_MD) -o $(GUIDE_ID).html -H bg-css.html
sed 's/src="\(.*\)\.pdf"/src="\1.svg"/g' $(GUIDE_ID).html > $(GUIDE_ID)_temp.html # use svg images
mv $(GUIDE_ID)_temp.html $(GUIDE_ID).html
rm -f $(GUIDE_ID)_temp*

$(GUIDE_ID)_usl_c_1.pdf: $(GUIDE_ID).md
$(PREPROC) $< $(PREPROC_MD)
pandoc $(PDF_OPTS) $(USLETTER) $(ONESIDE) $(PREPROC_MD)
xelatex $(GUIDE_ID)_temp.tex
makeindex $(GUIDE_ID)_temp.idx
xelatex $(GUIDE_ID)_temp.tex
xelatex $(GUIDE_ID)_temp.tex
mv $(GUIDE_ID)_temp.pdf $@
rm -f $(GUIDE_ID)_temp*

$(GUIDE_ID)_usl_c_2.pdf: $(GUIDE_ID).md
$(PREPROC) $< $(PREPROC_MD)
pandoc $(PDF_OPTS) $(USLETTER) $(TWOSIDE) $(PREPROC_MD)
xelatex $(GUIDE_ID)_temp.tex
makeindex $(GUIDE_ID)_temp.idx
xelatex $(GUIDE_ID)_temp.tex
xelatex $(GUIDE_ID)_temp.tex
mv $(GUIDE_ID)_temp.pdf $@
rm -f $(GUIDE_ID)_temp*

$(GUIDE_ID)_a4_c_1.pdf: $(GUIDE_ID).md
$(PREPROC) $< $(PREPROC_MD)
pandoc $(PDF_OPTS) $(A4) $(ONESIDE) $(PREPROC_MD)
xelatex $(GUIDE_ID)_temp.tex
makeindex $(GUIDE_ID)_temp.idx
xelatex $(GUIDE_ID)_temp.tex
xelatex $(GUIDE_ID)_temp.tex
mv $(GUIDE_ID)_temp.pdf $@
rm -f $(GUIDE_ID)_temp*

$(GUIDE_ID)_a4_c_2.pdf: $(GUIDE_ID).md
$(PREPROC) $< $(PREPROC_MD)
pandoc $(PDF_OPTS) $(A4) $(TWOSIDE) $(PREPROC_MD)
xelatex $(GUIDE_ID)_temp.tex
makeindex $(GUIDE_ID)_temp.idx
xelatex $(GUIDE_ID)_temp.tex
xelatex $(GUIDE_ID)_temp.tex
mv $(GUIDE_ID)_temp.pdf $@
rm -f $(GUIDE_ID)_temp*

$(GUIDE_ID)_usl_bw_1.pdf: $(GUIDE_ID).md
$(PREPROC) $< $(PREPROC_MD)
pandoc $(PDF_OPTS) $(USLETTER) $(ONESIDE) $(BW) $(PREPROC_MD)
xelatex $(GUIDE_ID)_temp.tex
makeindex $(GUIDE_ID)_temp.idx
xelatex $(GUIDE_ID)_temp.tex
xelatex $(GUIDE_ID)_temp.tex
mv $(GUIDE_ID)_temp.pdf $@
rm -f $(GUIDE_ID)_temp*

$(GUIDE_ID)_usl_bw_2.pdf: $(GUIDE_ID).md
$(PREPROC) $< $(PREPROC_MD)
pandoc $(PDF_OPTS) $(USLETTER) $(TWOSIDE) $(BW) $(PREPROC_MD)
xelatex $(GUIDE_ID)_temp.tex
makeindex $(GUIDE_ID)_temp.idx
xelatex $(GUIDE_ID)_temp.tex
xelatex $(GUIDE_ID)_temp.tex
mv $(GUIDE_ID)_temp.pdf $@
rm -f $(GUIDE_ID)_temp*

$(GUIDE_ID)_a4_bw_1.pdf: $(GUIDE_ID).md
$(PREPROC) $< $(PREPROC_MD)
pandoc $(PDF_OPTS) $(A4) $(ONESIDE) $(BW) $(PREPROC_MD)
xelatex $(GUIDE_ID)_temp.tex
makeindex $(GUIDE_ID)_temp.idx
xelatex $(GUIDE_ID)_temp.tex
xelatex $(GUIDE_ID)_temp.tex
mv $(GUIDE_ID)_temp.pdf $@
rm -f $(GUIDE_ID)_temp*

$(GUIDE_ID)_a4_bw_2.pdf: $(GUIDE_ID).md
$(PREPROC) $< $(PREPROC_MD)
pandoc $(PDF_OPTS) $(A4) $(TWOSIDE) $(BW) $(PREPROC_MD)
xelatex $(GUIDE_ID)_temp.tex
makeindex $(GUIDE_ID)_temp.idx
xelatex $(GUIDE_ID)_temp.tex
xelatex $(GUIDE_ID)_temp.tex
mv $(GUIDE_ID)_temp.pdf $@
rm -f $(GUIDE_ID)_temp*

$(GUIDE_ID)_lulu.pdf: $(GUIDE_ID)_lulu.md
$(PREPROC) $< $(PREPROC_MD)
pandoc $(PDF_OPTS) $(TWOSIDE) $(CROWNQUARTO) $(BLANKLAST) $(PREPROC_MD)
xelatex $(GUIDE_ID)_temp.tex
makeindex $(GUIDE_ID)_temp.idx
xelatex $(GUIDE_ID)_temp.tex
xelatex $(GUIDE_ID)_temp.tex
mv $(GUIDE_ID)_temp.pdf $@
rm -f $(GUIDE_ID)_temp*

$(GUIDE_ID)_amazon.pdf: $(GUIDE_ID)_amazon.md
$(PREPROC) $< $(PREPROC_MD)
pandoc $(PDF_OPTS) $(TWOSIDE) $(SIZE_75x925_AMAZON) $(BLANKLAST) $(PREPROC_MD)
xelatex $(GUIDE_ID)_temp.tex
makeindex $(GUIDE_ID)_temp.idx
xelatex $(GUIDE_ID)_temp.tex
xelatex $(GUIDE_ID)_temp.tex
mv $(GUIDE_ID)_temp.pdf $@
rm -f $(GUIDE_ID)_temp*

clean:
rm -f $(GUIDE_ID)_temp*

pristine: clean
rm -f $(HTML) $(BOOKS)

.PHONY: all, html, clean, pristine
Loading

0 comments on commit dee302d

Please sign in to comment.