Apache Spark技术实战(二)KafkaWordCount &PackratParsers实例 &Spark Cassandra Connector的安装和使用

Connector,mysql connector c++,jvm调优,.dll文件 ,www.515158.com

Apache Spark技术实战(二)KafkaWordCount &PackratParsers实例 &Spark Cassandra Connector的安装和使用

<一>KafkaWordCount

概要

Spark应用开发实践性非常强,很多时候可能都会将时间花费在环境的搭建和运行上,如果有一个比较好的指导将会大大的缩短应用开发流程。Spark Streaming中涉及到和许多第三方程序的整合,源码中的例子如何真正跑起来,文档不是很多也不详细。

本篇主要讲述如何运行KafkaWordCount,这个需要涉及Kafka集群的搭建,还是说的越仔细越好。

搭建Kafka集群

步骤1:下载kafka 0.8.1及解压

wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
tar zvxf kafka_2.10-0.8.1.1.tgz
cd kafka_2.10-0.8.1.1

步骤2:启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

步骤3:修改配置文件config/server.properties,添加如下内容

host.name=localhost

# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=localhost

步骤4:启动Kafka server

bin/kafka-server-start.sh config/server.properties

步骤5:创建topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1  --topic test

检验topic创建是否成功

bin/kafka-topics.sh --list --zookeeper localhost:2181

如果正常返回test

步骤6:打开producer,发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
##启动成功后,输入以下内容测试
This is a message
This is another message

 步骤7:打开consumer,接收消息

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
###启动成功后,如果一切正常将会显示producer端输入的内容
This is a message
This is another message

运行KafkaWordCount

KafkaWordCount源文件位置 :examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala

尽管里面有使用说明,见下文,但如果不是事先对Kafka有一定的了解的话,决然不知道这些参数是什么意思,也不知道该如何填写

/**
 * Consumes messages from one or more topics in Kafka and does wordcount.
 * Usage: KafkaWordCount    
 *    is a list of one or more zookeeper servers that make quorum
 *    is the name of kafka consumer group
 *    is a list of one or more kafka topics to consume from
 *    is the number of threads the kafka consumer should use
 *
 * Example:
 *    `$ bin/run-example 
 *      org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 
 *      my-consumer-group topic1,topic2 1`
 */
object KafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount    ")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc =  new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L))
      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

讲清楚了写这篇博客的主要原因之后,来看一看该如何运行KafkaWordCount

步骤1:停止运行刚才的kafka-console-producer和kafka-console-consumer

步骤2:运行KafkaWordCountProducer

bin/run-example org.apache.spark.examples.streaming.KafkaWordCountProducer localhost:9092 test 3 5

解释一下参数的意思,localhost:9092表示producer的地址和端口, test表示topic,3表示每秒发多少条消息,5表示每条消息中有几个单词

步骤3:运行KafkaWordCount

 bin/run-example org.apache.spark.examples.streaming.KafkaWordCount localhost:2181 test-consumer-group test 1

解释一下参数, localhost:2181表示zookeeper的监听地址,test-consumer-group表示consumer-group的名称,必须和$KAFKA_HOME/config/consumer.properties中的group.id的配置内容一致,test表示topic,1表示线程数。

<二>PackratParsers实例

概要

通过一个简明的Demo程序来说明如何使用scala中的PackratParsers

DemoApp

import scala.util.parsing.combinator.PackratParsers
import scala.util.parsing.combinator.syntactical._

object Dotter extends StandardTokenParsers with PackratParsers {
    //定义分割符
    lexical.delimiters ++= List(".",";","+","-","*")
    //合法的输入模式,支持加,减,乘
    lazy val pgm : PackratParser[Int] = expr | minus|multiply
    //定义模式加
    lazy val expr :PackratParser[Int]= num~"+"~num ^^ {case n1~"+"~n2 => n1.toInt + n2.toInt}
    //定义模式减
    lazy val minus :PackratParser[Int]= num~"-"~num ^^ {case n1~"-"~n2 => n1.toInt - n2.toInt}
    lazy val multiply :PackratParser[Int]= num~"*"~num ^^ {case n1~"*"~n2 => n1.toInt * n2.toInt}
    lazy val num = numericLit

    def parse(input: String) =
    phrase(pgm)(new PackratReader(new lexical.Scanner(input))) match {
      case Success(result, _) => println("Success!"); println(result);Some(result)
      case n @ _ => println(n);println("bla"); None
    }  

    def main(args: Array[String]) {
      //定义list,::表示添加,Nil表示list结束
      val prg = "12*2"::"24-4"::"3+5"::Nil
      prg.map(parse)
    }
}

parser中的表达式说明

A<~B 只保留左侧内容 A<~B 只保留A
A~>B 只保留右侧内容 A~>B 只保留B
^^ 根据匹配结果生成语法短语
^^^ 将语法短语转换成为另外的值,注意与^^的区别
~ 连接符 A ̃B 表示模式匹配是B紧跟于A之后
| 或者 A|B 表示模式要么由A组成,要么由B组成


编译执行

将上述源码保存到文件dotter.scala。

编译

scalac dotter.scala

执行

scala -cp . Dotter

<三>Spark Cassandra Connector的安装和使用

前提

假设当前已经安装好如下软件:

  • jdk
  • sbt
  • git
  • scala

安装cassandra

以archlinux为例,使用如下指令来安装Cassandra:

yaourt -S cassandra

启动cassandra:

cassandra -f

创建keyspace和table, 运行/usr/bin/cqlsh进入cql console,然后执行下述语句创建keyspace和table:

CREATE KEYSPACE test WITH replication = {class: SimpleStrategy, replication_factor: 1 };
CREATE TABLE test.kv(key text PRIMARY KEY, value int);

 添加记录,继续使用cql console:

INSERT INTO test.kv(key, value) VALUES (key1, 1);
INSERT INTO test.kv(key, value) VALUES (key2, 2);

验证记录已经插入成功,执行如下cql:

select * from test.kv;

下载编译spark-cassandra-connector

下载最新的spark-cassandra-connector源码:

git clone https://github.com/datastax/spark-cassandra-connector.git

编译:

sbt package

这中间要等待比较长的时间,请保持足够的耐心。

运行spark-shell

首先请确保cassandra已经正常安装和运行,如有问题请返回开始的章节安装Cassandra。

如何添加相应的library来支持spark-cassandra-connector,并没有一个明确的文档说明,折腾了一个下午,终于弄出了一个最简的配置。

bin/spark-shell --driver-class-path /root/working/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector_2.10-1.1.0-SNAPSHOT.jar:
/root/.ivy2/cache/org.apache.cassandra/cassandra-thrift/jars/cassandra-thrift-2.0.9.jar:
/root/.ivy2/cache/org.apache.thrift/libthrift/jars/libthrift-0.9.1.jar:
/root/.ivy2/cache/org.apache.cassandra/cassandra-clientutil/jars/cassandra-clientutil-2.0.9.jar:
/root/.ivy2/cache/com.datastax.cassandra/cassandra-driver-core/jars/cassandra-driver-core-2.0.4.jar:
/root/.ivy2/cache/io.netty/netty/bundles/netty-3.9.0.Final.jar:
/root/.ivy2/cache/com.codahale.metrics/metrics-core/bundles/metrics-core-3.0.2.jar:
/root/.ivy2/cache/org.slf4j/slf4j-api/jars/slf4j-api-1.7.7.jar:
/root/.ivy2/cache/org.apache.commons/commons-lang3/jars/commons-lang3-3.3.2.jar:
/root/.ivy2/cache/org.joda/joda-convert/jars/joda-convert-1.2.jar:
/root/.ivy2/cache/joda-time/joda-time/jars/joda-time-2.3.jar:
/root/.ivy2/cache/org.apache.cassandra/cassandra-all/jars/cassandra-all-2.0.9.jar:
/root/.ivy2/cache/org.slf4j/slf4j-log4j12/jars/slf4j-log4j12-1.7.2.jar

上述指令假设spark-cassandra-connector的源码是下载在$HOME/working目录下,请根据自己的情况作适当修改。

我是如何猜测到需要指定这些包依赖的呢?说白了,也很简单,就是执行以下指令,然后再查看相就的java进程中的运行参数。

#运行spark-cassandra-connector测试集
sbt test
sbt it:test

当上述指令还在运行的时候,使用ps来查看java运行的参数,这样就反过来知道所需要的包依赖了。

ps -ef|grep -i java

测试程序

由于spark-shell会默认创建sc,所以首先需要停止掉默认的sc,然后利用新的配置来创建可以连接到cassandra的sc,示例代码如下:

sc.stop
import com.datastax.spark.connector._
import org.apache.spark._
val conf = new SparkConf()
conf.set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext("local[2]", "Cassandra Connector Test", conf)
val table = sc.cassandraTable("test", "kv")
table.count

如果一切正常会显示出如下结果:

res3: Long = 2

小结

进入实战阶段,挑战会越来越多,保持足够的信心和耐心很重要。

本篇内容和实战一中的kafka cluster组织在一起的话,就会形成一个从前台到后台存储的完整处理链条。

OA  OA软件  OA系统  OA办公系统  协同OA软件  OA办公软件  开源OA  协同OA  PHPOA  oa  企业信用查询 图片地址  修改tomcat端口号  tomcat服务器部署  myeclipse部署tomcat  服务器安装tomcat  腾讯云主机  高级办公软件分数等级  安装时发生严重错误  0x80070643  Orchard CMS  webform mvc 表单  图像查找  sql2000 1053错误  ios 蓝牙  节能  设置谷歌翻译  谷歌自动翻译  谷歌地球  谷歌推送  谷歌地图  谷歌  打包安装  winform   visiual studio  visiual   #单片机  泛型 ,反射 c#  json  支付接口申请  C#读图像  matlab自定义函数  读取object变量中的数组  窗体应用程序  cruisecontrol 打包  百度地图 覆盖物  json minlength scala  scala  批量导出数据库  远程执行命令  命令技巧  Shell入门  使用范例  文件目录  mysql递归oracle  数据gsonjson  sdk无线驱动qcaopenwrtlinux  netae源码c#  java io  java.io  io输出流  io复用