without haste but without rest
Spark Streaming tutorial 본문
A Quick Example
특정 포트로 들어오는 텍스트를 분리해서 카운팅 하는 예제
# network_wordcount.py
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
Netcat
TCP, UDP 프로토콜을 사용해서 네트워크를 연결하고 데이터를 읽고 쓰는 유틸리티 프로그램
nc -lk 9999
작성한 스크립트를 실행한다.
./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py
nc 커맨드라인에서 입력한 텍스트를 분리해서 카운팅 한다.
'Data Engineering & DataOps > Spark' 카테고리의 다른 글
IntelliJ에서 Scala로 Spark 프로젝트 시작하기 (0) | 2022.02.18 |
---|---|
Zeppelin 커스텀 컨테이너 이미지 빌드 (0) | 2022.02.08 |
Spark 개념과 특징 (0) | 2021.12.10 |
spark2 - pyspark TypeError: an integer is required (got type bytes) & zeppelin pyspark is not responding (0) | 2021.08.05 |
Comments