1500字范文,内容丰富有趣,写作好帮手!
1500字范文 > Kafka日记(五)RdKafka文档翻译

Kafka日记(五)RdKafka文档翻译

时间:2019-03-06 02:03:03

相关推荐

Kafka日记(五)RdKafka文档翻译

RdKafka文档翻译

出于使用 , 特做简单翻译 ,仅做参考。

https://arnaud-lb.github.io/php-rdkafka/phpdoc/book.rdkafka.html

函数string rd_kafka_err2str ( integer $err ) 将rdkafka错误代码转换为字符串integer rd_kafka_errno2err ( integer $errnox ) 将系统errno转换为Kafka错误代码integer rd_kafka_errno ( void ) 返回系统errnointeger rd_kafka_offset_tail ( integer $cnt ) 返回一个特殊的偏移量值,该值可用于在主题尾部之前开始使用cnt消息RdKafka\KafkaConsume类这是高水平消费者,支持自动分区/撤销(pecl rdkafka>=1.0.0,librdkafka>=0.9)1)public void RdKafka\KafkaConsumer::assign ([ array $topic_partitions = NULL ] )更新分配集到$topic_partitions,可以通过调用RdKafka\Conf::setDefaultTopicConf()来更改主题的默认配置$kafkaConsumer->assign([new RdKafka\TopicPartition("logs", 0),new RdKafka\TopicPartition("logs", 1),]);2)public void RdKafka\KafkaConsumer::commit ([ mixed $message_or_offsets = NULL ] )同步提交偏移,直到提交偏移或提交失败为止。如果注册了COMMIT_CB回调,那么它将被调用,并包含未来要使用的调用的提交详细信息。参数message_or_offsetsWhen NULL, commit offsets for the current assignment.When a RdKafka\Message, commit offset for a single topic+partition based on the message.When an array of RdKafka\TopicPartition, commit offsets for the provided list of partitions.异常Errors/ExceptionsThrows RdKafka\Exception on errors.例子:// Commit offsets for the current assignment$kafkaConsumer->commit();// Commit offsets based on the message's topic, partition, and offset$kafkaConsumer->commit($message);// Commit offsets by providing a list of TopicPartition$kafkaConsumer->commit([new RdKafka\TopicPartition($topic, $partition, $offset),]);3)public void RdKafka\KafkaConsumer::commitAsync ([ string $message_or_offsets = NULL ] )异步提交偏移4)public RdKafka\KafkaConsumer::__construct ( RdKafka\Conf $conf )参数conf (RdKafka\Conf)The conf object must have group.id set to the consumer group to join.conf对象必须将Group.id设置为要加入的消费者组。示例:$conf = new RdKafka\Conf();$conf->set("group.id", "myGroupID");$kafkaConsumer = new RdKafka\KafkaConsumer($conf);5)public RdKafka\Message RdKafka\KafkaConsumer::consume ( string $timeout_ms )使用消息或获取错误事件,触发回调将自动调用任何此类排队事件的已注册回调,包括rebalance_cb, event_cb, commit_cb, etc.参数timeout_ms (int) 超时时间(milliseconds)返回值Returns a RdKafka\Message. On error or timeout, RdKafka\Message::$err is != RD_KAFKA_ERR_NO_ERROR, and other properties should be ignored.注意:应用程序应确保定期调用consume (),即使没有预期的消息,为等待调用的排队回调提供服务,当rebalnce_cb已经注册时,这一点尤其重要,因为需要正确地调用和处理它,以同步内部使用者状态。while (true) {$message = $kafkaConsumer->consume(3600e3);switch ($message->err) {case RD_KAFKA_RESP_ERR_NO_ERROR:handle($message);break;case RD_KAFKA_RESP_ERR__TIMED_OUT:echo "Timedout\n";break;default:throw new \Exception($message->errstr());break;}}6)public array RdKafka\KafkaConsumer::getAssignment ( void )返回由assign设置 或 再平衡的 当前分区分配集Returns the current partition assignment as set by RdKafka\KafkaConsumer::assign() or by rebalancing.返回值Returns an array of RdKafka\TopicPartition 返回RdKafka\Topic分区的数组Errors/ExceptionsThrows RdKafka\Exception on errors.6)public RdKafka\Metadata RdKafka\KafkaConsumer::getMetadata ( bool $all_topics , RdKafka\KafkaConsumerTopic $only_topic = NULL , int $timeout_ms)向代理请求元数据参数all_topics (bool)When TRUE, request info about all topics in cluster. Else, only request info about locally known topics.如果为真,请求有关集群中所有主题的信息。否则,只请求有关本地已知主题的信息only_topic (RdKafka\KafkaConsumerTopic) When non-null, only request info about this topic当非空时,只请求有关此主题的信息。timeout_ms (int)Timeout (milliseconds) 超时返回值Returns a RdKafka\Metadata instance 示例$all = $kafkaConsumer->metadata(true, NULL, 60e3);$local = $kafkaConsumer->metadata(false, NULL, 60e3);$topic = $kafkaConsumer->newTopic("myTopic");$one = $kafkaConsumer->metadata(true, $topic, 60e3);7)public array RdKafka\KafkaConsumer::getSubscription ( void )返回RdKafka\KafkaConsumer:subscribe()设置的当前订阅Return the current subscription as set by RdKafka\KafkaConsumer::subscribe()返回值Returns an array of topic names 返回主题名称数组8)public void RdKafka\KafkaConsumer::subscribe ( array $topics ) 将订阅集更新为主题。这将覆盖当前任务。任何先前的订阅都将首先被取消分配和取消订阅。订阅集表示要消费的所需主题.......可以通过调用RdKafka\Conf::setDefaultTopicConf()更改订阅主题的默认配置。$kafkaConsumer->assign(["logs","^myPfx[0-9]_.*",]);9)public ReturnType RdKafka\KafkaConsumer::unsubscribe ( void )从当前订阅集取消订阅RdKafka\KafkaConsumerTopic类(PECL rdkafka >= 1.0.0, librdkafka >= 0.9)This class represents a topic when using the RdKafka\KafkaConsumer. It can not be instantiated directly, RdKafka\KafkaConsumer::newTopic() should be used instead.当想使用RdKafka\KafkaConsumer去表示一个主题的时候,不能直接实例化,应该使用RdKafka\KafkaConsumer::newTopic()替代1)public void RdKafka\KafkaConsumerTopic::offsetStore ( integer $partition , integer $offset )Store offset offset for topic partition partition. The offset will be commited (written) to the offset store according to mit.interval.mit.interval.ms消费者offset提交到zookeeper的频率(以毫秒为单位)(0.9之后就默认存储再broke中了)mit.enable must be set to false when using this API.使用此API时 mit.enable必须设置为false,如果mit设置为true,则消费者偏移量自动提交给Kafka的频率(以毫秒为单位)。auto.offset.resetlargest如果ZooKeeper中没有初始偏移量,或偏移值超出范围,该怎么办?最小:自动将偏移重置为最小偏移最大:自动将偏移重置为最大偏移* 其他任何事情:抛出异常消费者参数partition (integer)Partition IDoffset (integer)Offset2)/* Inherited methods */public string RdKafka\Topic::getName ( void )RdKafka类(PECL rdkafka >= 0.9.1)This is the base class for low-level clients: RdKafka\Consumer, RdKafka\Producer. This class can not be instanciated directly, use one of the sub classes instead.这是低级消费者客户端的基类:RdKafka\Consumer,RdKafka\Producer。不能直接实例化这个类,而是使用其中一个子类。1)public integer RdKafka::addBrokers ( string $broker_list )将一个或多个代理添加到Kafka句柄的初始引导代理列表中。当rdkafka通过查询代理元数据连接到代理时,将自动发现其他代理。如果代理名称解析为多个地址(可能是地址家族),则所有代理名称都将以循环方式用于连接尝试。返回值Returns the number of brokers successfully added.成功添加的代理个数代理还可以使用metadata.broker.list或bootstrap.server配置属性(首选方法)进行定义。$kafka->addBrokers("broker1:10000,broker2");$kafka->addBrokers("SSL://broker3:9000,ssl://broker2");2)public RdKafka\Metadata RdKafka::getMetadata ( bool $all_topics , RdKafka\Topic $only_topic = NULL , int $timeout_ms ) Request Metadata from broker 向代理请求元数据上面有一个3)public integer RdKafka::getOutQLen ( void )返回当前的输出队列长度。Out队列包含等待发送给代理的消息,或代理知道的消息。3)public RdKafka\Queue RdKafka::newQueue ( void )创建一个新的消息队列实例Return ValuesReturns a RdKafka\Queue.4)public RdKafka\Topic RdKafka::newTopic ( string $topic_name [, RdKafka\TopicConf $topic_conf = NULL ] )Creates a new topic instance for topic_name.为Topic_Name创建一个新的主题实例。Returns a RdKafka\Topic (more specifically, either a RdKafka\ConsumerTopic or a RdKafka\ProducerTopic).为具有不同配置的同一主题名称创建两个主题实例没有任何效果。每个主题实例都将使用第一个实例的配置。$conf = new RdKafka\TopicConf();$conf->set("...", "...");$topic = $kafka->newTopic("myTopic", $conf);4)public void RdKafka::poll ( integer $timeout_ms )对于事件的轮询,导致调用应用程序提供的回调使用rdKafka子类的应用程序应该确保定期调用poll(),以便为等待调用的任何排队回调服务。Events:Delivery report callbacks RdKafka\Conf::setDrMsgCb() [producer]Error callbacks (RdKafka\Conf::setErrorCb())Stats callbacks (RdKafka\Conf::setStatsCb())Throttle callbacks (RdKafka\Conf::setThrottleCb())Parameterstimeout_ms (integer)Specifies the maximum amount of time (in milliseconds) that the call will block waiting for events. For non-blocking calls, provide 0 as timeout_ms. To wait indefinately for an event, provide -1.指定调用将阻止等待事件的最大时间(以毫秒为单位)。对于非阻塞调用,提供0作为timeout_ms。若要不确定地等待某个事件,请提供-1。Return ValuesReturns the number of events served.返回服务的事件数5)public void RdKafka::setLogLevel ( integer $level )指定内部Kafka日志记录和调试产生的最大日志记录级别。如果设置了“DEBUG”配置属性,该级别将自动调整为LOG_DEBUG。Parameterslevel (integer)Log level. Can take any LOG_* constant (see the syslog function).日志级别。可以接受任何log_*常量(请参阅syslog函数)。RdKafka\Consumer 类This is the low-level Kafka consumer. It can be used with Kafka >= 0.8.低级消费者1)public RdKafka\Consumer::__construct ([ RdKafka\Conf $conf = NULL ] )Parametersconf (RdKafka\Conf)An optional RdKafka\Conf instance.此类只有继承(低级消费者基类RdKafka)的以下几个方法RdKafka\Consumer extends RdKafka {/* Methods *//* Inherited methods */public integer RdKafka::addBrokers ( string $broker_list )public RdKafka\Metadata RdKafka::getMetadata ( bool $all_topics , RdKafka\Topic $only_topic = NULL , int $timeout_ms )public integer RdKafka::getOutQLen ( void )public RdKafka\Queue RdKafka::newQueue ( void )public RdKafka\Topic RdKafka::newTopic ( string $topic_name [, RdKafka\TopicConf $topic_conf = NULL ] )public void RdKafka::poll ( integer $timeout_ms )public void RdKafka::setLogLevel ( integer $level )}RdKafka\Producer类(PECL rdkafka >= 0.9.1)1)public RdKafka\Producer::__construct ([ RdKafka\Conf $conf = NULL ] )Parametersconf (RdKafka\Conf)An optional RdKafka\Conf instance.RdKafka\Producer extends RdKafka {/* Methods *//* Inherited methods */public integer RdKafka::addBrokers ( string $broker_list )public RdKafka\Metadata RdKafka::getMetadata ( bool $all_topics , RdKafka\Topic $only_topic = NULL , int $timeout_ms )public integer RdKafka::getOutQLen ( void )public RdKafka\Queue RdKafka::newQueue ( void )public RdKafka\Topic RdKafka::newTopic ( string $topic_name [, RdKafka\TopicConf $topic_conf = NULL ] )public void RdKafka::poll ( integer $timeout_ms )public void RdKafka::setLogLevel ( integer $level )}RdKafka\Topic类(PECL rdkafka >= 0.9.1)1)public string RdKafka\Topic::getName ( void )Returns the topic name.返回主题名称RdKafka\ConsumerTopic 类(PECL rdkafka >= 0.9.1)当使用RdKafka\Consumer时,该类表示一个主题。不能直接实例化它,应该使用RdKafka\Consumer:newTopic()。1)public RdKafka\Message RdKafka\ConsumerTopic::consume ( integer $partition , integer $timeout_ms )消费-使用来自分区的单个消息消费者之前必须使用 RdKafka\ConsumerTopic::consumeStart().必须检查返回消息的ERR属性是否存在错误。Err属性等于RD_Kafka_RESP_ERR_PARY_EOF,表示已到达分区的结束,通常不应将其视为错误。应用程序应该处理这种情况(例如,忽略)。Parameterspartition (integer)The partition to consumetimeout_msThe maximum amount of time to wait for a message to be received.Returns a RdKafka\Message or NULL on timeout. 正常返回RdKafka\Message,超时返回NULL。2)public void RdKafka\ConsumerTopic::consumeQueueStart ( integer $partition , integer $offset , RdKafka\Queue $queue )与RdKafka\ConsumerTopic::consumerTopic()相同,但将传入消息重新路由到提供的队列。应用程序必须使用一个RdKafka\Queue::consumer*()函数来接收获取的消息。参数partition (integer)Partition IDoffset (integer)Offsetqueue (RdKafka\Queue)A RdKafka\Queue instance3)public void RdKafka\ConsumerTopic::consumeStart ( integer $partition , integer $offset )开始在偏移量处使用分区的消息(请参阅参数中允许的值)。librdkafka将尝试通过反复从代理获取批消息,直到达到阈值,从而将queued.min.messages (config属性)消息保留在本地队列中。应用程序应该使用RdKafka\ConsumerTopic::consumeStart()方法来使用本地队列中的消息,每个Kafka消息都表示为RdKafka\Message对象。对于同一个主题和分区,不能多次调用RdKafka\ConsumerTopic::consumeStart()。在没有停止消费的情况下,先使用RdKafka\ConsumerTopic::consumeStop()停止消费后再开始消费。Parameterspartition (integer)Partition ID offset (integer)Can be either a proper offset (0..N), or one the the special offset:可以是正常的偏移量(0.N),也可以是特殊的偏移量:RD_KAFKA_OFFSET_BEGINNINGRD_KAFKA_OFFSET_ENDRD_KAFKA_OFFSET_STOREDThe return value of rd_kafka_offset_tail() 示例:$partition = 123;// consume from the end$topic->consumeStart($partition, RD_KAFKA_OFFSET_END);// consume from the stored offset$topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED);// consume 200 messages from the end$topic->consumeStart($partition, rd_kafka_offset_tail(200));4)public void RdKafka\ConsumerTopic::consumeStop ( integer $partition )Stop consuming messages from partition停止使用来自分区的消息停止使用分区消息,清除当前本地队列中的所有消息。5)public void RdKafka\ConsumerTopic::offsetStore ( integer $partition , integer $offset )store offset存储offsetParameterspartition (integer)Partition IDoffset (integer)OffsetRdKafka\ProducerTopic类(PECL rdkafka >= 0.9.1)当使用RdKafka\Producer时,该类表示一个主题。不能直接实例化它,应该使用RdKafka\Producer::newTopic().RdKafka\ProducerTopic extends RdKafka\Topic {/* Methods */public void produce ( integer $partition , integer $msgflags , string $payload [, string $key = NULL ] )/* Inherited methods */public string RdKafka\Topic::getName ( void )}1)public void RdKafka\ProducerTopic::produce ( integer $partition , integer $msgflags , string $payload [, string $key = NULL ] )生成并向代理发送一条消息。这是一个异步和非阻塞的。Parameterspartition (integer)Can be either RD_KAFKA_PARTITION_UA (unassigned) for automatic partitioning using the topic's partitioner function (see RdKafka\TopicConf::setPartitioner(), or a fixed partition (0..N).msgflags (integer)可以是RD_Kafka_PARID_UA(未分配的),用于使用主题的分区函数(请参见RdKafka\TopicConf::setPartitioner(),也可以是固定的分区(0.N)。msgflags (integer)Must be 0payload (string)Payload stringkey (string)Optional message key, if non-NULL it will be passed to the topic partitioner as well as be sent with the message to the broker and passed on to the consumer.可选消息键,如果非空,则将其传递给主题分区程序,并与消息一起发送给代理并传递给使用者。$message = ['type' => 'account-created','id' => $accountId,'date' => date(DATE_W3C),];$payload = json_encode($message);$topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload, $accountId);with the message to the broker and passed on to the consumer.RdKafka\Queuel类(PECL rdkafka >= 0.9.1)1)public RdKafka\Message RdKafka\Queue::consume ( string $timeout_ms )使用一条消息Parameterstimeout_msThe maximum amount of time to wait for a message to be received.Return ValuesReturns a RdKafka\Message or NULL on timeout.RdKafka\Message 类(PECL rdkafka >= 0.9.1)此对象表示单个已消费或生产的消息或事件(设置了$err)。This object represents either a single consumed or produced message, or an event ($err is set).An application must check RdKafka\Message::err to see if the object is a proper message (error is RD_KAFKA_RESP_ERR_NO_ERROR) or an error event.RdKafka\Message {/* Properties */public $err ; //Error codepublic $topic_name ; public $partition ;public $payload ;public $key ;public $offset ;/* Methods */public string errstr ( void )}1)public string RdKafka\Message::errstr ( void )这是一种方便的方法,将错误作为字符串返回Return ValuesThe error as a stringif ($message->err) {echo $message->errstr(), "\n";}RdKafka\Conf 类(PECL rdkafka >= 0.9.1)This class holds configuration for consumers and producers.A list of available properties can be found on the »librdkafka repository. Note that available configuration properties and default values may change depending on the librdkafka version.该类包含使用者和生产者的配置请注意,可用的配置属性和默认值可能会根据librdkafka 版本而改变。RdKafka\Conf {/* Methods */public void dump ( void )public void set ( string $name , string $value )public void setDefaultTopicConf ( RdKafka\TopicConf $topic_conf )public void setDrMsgCb ( callable $callback )public void setErrorCb ( callable $callback )public void setRebalanceCb ( callable $callback )}1)public void RdKafka\Conf::dump ( void )Dumps the configuration properties and values to an array.转储配置属性和值到数组Return ValuesReturns an array with configuration properties as keys, and configuration values as values.2)public void RdKafka\Conf::set ( string $name , string $value )Set configuration property name to value.设置配置属性 属性名=>属性值3)public void RdKafka\Conf::setDefaultTopicConf ( RdKafka\TopicConf $topic_conf )设置用于自动订阅主题的默认主题配置。可以与RdKafka\KafkaConsumer::subscribe()或者RdKafka\KafkaConsumer::assign()一起使用Sets the default topic configuration to use for for automatically subscribed topics. This can be used along with RdKafka\KafkaConsumer::subscribe() or RdKafka\KafkaConsumer::assign().4)public void RdKafka\Conf::setDrMsgCb ( callable $callback )设置传递报告回调,对于RdKafka\ProducerTopic::Producer()接受的每条消息,将调用一次传递报告回调,并将ERR设置为指示生产请求的结果。当消息成功地生成时,或者如果librdkafka 遇到永久故障,或者临时错误的重试计数器已经耗尽,就会调用回调。应用程序必须定期调用rdKafka::poll(),以便为排队的传递报告回调服务。Parameterscallback (callable)A callable with the following signature:<?php/*** @param RdKafka\Kafka $kafka* @param RdKafka\Message $message*/function (RdKafka\Kafka $kafka, RdKafka\Message $message);$conf->setDrMsgCb(function ($kafka, $message) {if ($message->err) {// message permanently failed to be delivered} else {// message successfully delivered}});5)public void RdKafka\Conf::setErrorCb ( callable $callback )设置错误回调。librdkafka 使用错误回调将ciritcal错误信号发送回应用程序。Parameterscallback (callable)A callable with the following signature:<?php/*** @param object $kafka* @param int $err* @param string $reason*/function ($kafka, $err, $reason);<?php$conf->setErrorCb(function ($kafka, $err, $reason) {printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);});?>6)public void RdKafka\Conf::setRebalanceCb ( callable $callback )Set rebalance callback for use with coordinated consumer group balancing.设置“再平衡回调”,以便与协调的消费者组 平衡一起使用。Registering a rebalance_cb turns off librdkafka's automatic partition assignment/revocation and instead delegates that responsibility to the application's rebalance_cb.注册一个reBalance_cb会关闭librdkafka的自动分区分配/撤销,而是将这一责任委托给应用程序的reBalance_cb。The rebalance callback is responsible for updating librdkafka's assignment set based on the two events RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONSand RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS but should also be able to handle arbitrary rebalancing failures where err is neither of those.重新平衡回调负责根据这两个事件RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS和RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS更新librdkafka的分配集,但也应该能够再平衡处理任意的不止这些的失败。In this latter case (arbitrary error), the application must $kafka->assign(NULL) to synchronize (同步)state.后一种情况 必须使用assign去同步状态在没有重新平衡回调的情况下,这是由librdkafka自动完成的,但是注册一个重新平衡回调会使应用程序在执行其他操作时具有灵活性,同时还可以执行排序/撤销操作(assinging/revocation)。例如从另一个位置获取偏移量(在赋值时)或手动提交偏移量(在REVOKE上)。Parameterscallback (callable)A callable with the following signature:<?php/*** @param RdKafka\KafkaConsumer $kafka* @param int $err* @param array $partitions*/function (RdKafka\KafkaConsumer $kafka, $err, $partitions);ERR参数被设置为RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS或RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS(或意外错误)。partitions参数是RdKafka\TopicPartition数组,表示分配或撤销的完整分区集。<?php$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {switch ($err) {case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:// application may load offets from arbitrary external// storage here and update partitions$kafka->assign($partitions);break;case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:if ($manual_commits) {// Optional explicit manual commit$kafka->commit($partitions);}$kafka->assign(NULL);break;default:handle_unlikely_error($err);$kafka->assign(NULL); // sync state同步状态break;}}?>RdKafka\TopicConf类(PECL rdkafka >= 0.9.1)该类保存主题topic实例的配置。A list of available properties can be found on the » librdkafka repository. Note that available configuration properties and default values may change depending on the librdkafka version.注意配置属性依赖版本 , 可以从librdkafka仓库中查看详细配置RdKafka\TopicConf {/* Methods */public void dump ( void )public void set ( void )public void setPartitioner ( integer $partitioner )}1)public void RdKafka\TopicConf::dump ( void )将配置属性和值转储到数组。返回一个数组,其中配置属性作为键,配置值作为值。2)public void RdKafka\TopicConf::set ( void )Set configuration property name to value.3)public void RdKafka\TopicConf::setPartitioner ( integer $partitioner )将分区器设置为根据keys将消息路由到分区。Parameterspartitioner (integer)Must be one of the RD_KAFKA_MSG_PARTITIONER_* constants.必须是RD_Kafka_MSG_Partiator_*常量之一。RdKafka\Exception类(PECL rdkafka >= 0.9.1)RdKafka Exception.异常类RdKafka\Exception extends Exception {/* Inherited properties */protected string $message ;protected int $code ;protected string $file ;protected int $line ;/* Methods *//* Inherited methods */final public string Exception::getMessage ( void )final public Exception Exception::getPrevious ( void )final public mixed Exception::getCode ( void )final public string Exception::getFile ( void )final public int Exception::getLine ( void )final public array Exception::getTrace ( void )final public string Exception::getTraceAsString ( void )public string Exception::__toString ( void )final private void Exception::__clone ( void )}RdKafka\TopicPartition类(PECL rdkafka >= 1.0.0, librdkafka >= 0.9)Topic+Partition 主题加分区RdKafka\TopicPartition {/* Methods */public integer getOffset ( void )public integer getPartition ( void )public string getTopic ( void )public void setOffset ( string $offset )public ReturnType setPartition ( string $partition )public ReturnType setTopic ( string $topic_name )}1)public RdKafka\TopicPartition::__construct ( string $topic , integer $partition [, integer $offset = NULL ] )Parameterstopic (string)Topic namepartition (integer)Partition IDoffset (integer)Offset<?phpnew RdKafka\TopicPartition("myTopic", 1);?>2)public integer RdKafka\TopicPartition::getOffset ( void )获取偏移量3)public integer RdKafka\TopicPartition::getPartition ( void )Gets the partition ID. 得到分区id4)public string RdKafka\TopicPartition::getTopic ( void )Gets the topic name. 得到主题5)public void RdKafka\TopicPartition::setOffset ( string $offset )Sets the offset. 设置偏移量6)public ReturnType RdKafka\TopicPartition::setPartition ( string $partition )Sets the partition ID.7)public ReturnType RdKafka\TopicPartition::setTopic ( string $topic_name )Sets the topic name.RdKafka\Metadata类(PECL rdkafka >= 0.9.1)The Metadata class represents broker information. Metadata instances are returned by RdKafka::getMetadata() and RdKafka\KafkaConsumer::getMetadata().元数据类表示代理信息。元数据实例由RdKafka::getMetadata() 和RdKafka\KafkaConsumer::getMetadata()返回。RdKafka\Metadata {/* Methods */public RdKafka\Metadata\Collection getBrokers ( void )public int getOrigBrokerId ( void )public string getOrigBrokerName ( void )public RdKafka\Metadata\Collection getTopics ( void )}1)public RdKafka\Metadata\Collection RdKafka\Metadata::getBrokers ( void )Get broker listReturns a RdKafka\Metadata\Collection of RdKafka\Metadata\Broker2)public int RdKafka\Metadata::getOrigBrokerId ( void )获取源自此元数据的代理id3)public string RdKafka\Metadata::getOrigBrokerName ( void )获取源自此元数据的代理名称。4)public RdKafka\Metadata\Collection RdKafka\Metadata::getTopics ( void )获取主题列表。根据元数据的请求方式,这可能包含单个主题、本地已知主题列表或所有集群主题。Returns a RdKafka\Metadata\Collection of RdKafka\Metadata\TopicRdKafka\Metadata\Collection类(PECL rdkafka >= 0.9.1)集合类用作元数据项的集合。它实现了 Countable and Iterable,因此它可以与count()和foreach一起使用RdKafka\Metadata\Collection implements Countable , Iterator {/* Methods */public int count ( void )public mixed current ( void )public scalar key ( void )public void next ( void )public void rewind ( void )public boolean valid ( void )}1)public int RdKafka\Metadata\Collection::count ( void )Returns the number of elements as integer 返回元素数量2)public mixed RdKafka\Metadata\Collection::current ( void )Gets the current value. 获取到当前的值返回值:The current value if it is valid or NULL otherwise.3)public scalar RdKafka\Metadata\Collection::key ( void )Get the current key.返回值:The current key if it is valid or NULL otherwise.4)public void RdKafka\Metadata\Collection::next ( void )移到下一个元素。5)public void RdKafka\Metadata\Collection::rewind ( void )将Iterator倒转到第一个元素6)public boolean RdKafka\Metadata\Collection::valid ( void )Checks if current position is valid 检查当前位置是否有效Returns TRUE on success or FALSE on failure.Predefined ConstantsThe constants below are defined by this extension, and will only be available when the extension has either been compiled into PHP or dynamically loaded at runtime.下面的常量是由这个扩展定义的,并且只有当扩展编译到PHP或在运行时动态加载时才可用。RD_KAFKA_CONSUMER (integer)RD_KAFKA_OFFSET_BEGINNING (integer)Start consuming from beginning of kafka partition queue: oldest msg.RD_KAFKA_OFFSET_END (integer)Start consuming from end of kafka partition queue: next msg.RD_KAFKA_OFFSET_STORED (integer)Start consuming from offset retrieved from offset store.RD_KAFKA_PARTITION_UA (integer)The unassigned partition is used by the producer API for messages that should be partitioned using the configured or default partitioner.RD_KAFKA_PRODUCER (integer)RD_KAFKA_VERSION (integer)RD_KAFKA_RESP_ERR__BEGIN (integer)RD_KAFKA_RESP_ERR__BAD_MSG (integer)Local: Bad message formatRD_KAFKA_RESP_ERR__BAD_COMPRESSION (integer)Local: Invalid compressed dataRD_KAFKA_RESP_ERR__DESTROY (integer)Local: Broker handle destroyedRD_KAFKA_RESP_ERR__FAIL (integer)Local: Communication failure with brokerRD_KAFKA_RESP_ERR__TRANSPORT (integer)Local: Broker transport failureRD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE (integer)Local: Critical system resource failureRD_KAFKA_RESP_ERR__RESOLVE (integer)Local: Host resolution failureRD_KAFKA_RESP_ERR__MSG_TIMED_OUT (integer)Local: Message timed outRD_KAFKA_RESP_ERR__PARTITION_EOF (integer)Broker: No more messagesRD_KAFKA_RESP_ERR__UNKNOWN_PARTITION (integer)Local: Unknown partitionRD_KAFKA_RESP_ERR__FS (integer)Local: File or filesystem errorRD_KAFKA_RESP_ERR__UNKNOWN_TOPIC (integer)Local: Unknown topicRD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN (integer)Local: All broker connections are downRD_KAFKA_RESP_ERR__INVALID_ARG (integer)Local: Invalid argument or configurationRD_KAFKA_RESP_ERR__TIMED_OUT (integer)Local: Timed outRD_KAFKA_RESP_ERR__QUEUE_FULL (integer)Local: Queue fullRD_KAFKA_RESP_ERR__ISR_INSUFF (integer)Local: ISR count insufficientRD_KAFKA_RESP_ERR__NODE_UPDATE (integer)Local: Broker node updateRD_KAFKA_RESP_ERR__SSL (integer)Local: SSL errorRD_KAFKA_RESP_ERR__WAIT_COORD (integer)Local: Waiting for coordinatorRD_KAFKA_RESP_ERR__UNKNOWN_GROUP (integer)Local: Unknown groupRD_KAFKA_RESP_ERR__IN_PROGRESS (integer)Local: Operation in progressRD_KAFKA_RESP_ERR__PREV_IN_PROGRESS (integer)Local: Previous operation in progressRD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION (integer)Local: Existing subscriptionRD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS (integer)Local: Assign partitionsRD_KAFKA_RESP_ERR__REVOKE_PARTITIONS (integer)Local: Revoke partitionsRD_KAFKA_RESP_ERR__CONFLICT (integer)Local: Conflicting useRD_KAFKA_RESP_ERR__STATE (integer)Local: Erroneous stateRD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL (integer)Local: Unknown protocolRD_KAFKA_RESP_ERR__NOT_IMPLEMENTED (integer)Local: Not implementedRD_KAFKA_RESP_ERR__AUTHENTICATION (integer)Local: Authentication failureRD_KAFKA_RESP_ERR__NO_OFFSET (integer)Local: No offset storedRD_KAFKA_RESP_ERR__END (integer)RD_KAFKA_RESP_ERR_UNKNOWN (integer)Unknown broker errorRD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE (integer)Broker: Offset out of rangeRD_KAFKA_RESP_ERR_INVALID_MSG (integer)Broker: Invalid messageRD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART (integer)Broker: Unknown topic or partitionRD_KAFKA_RESP_ERR_INVALID_MSG_SIZE (integer)Broker: Invalid message sizeRD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE (integer)Broker: Leader not availableRD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION (integer)Broker: Not leader for partitionRD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT (integer)Broker: Request timed outRD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE (integer)Broker: Broker not availableRD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE (integer)Broker: Replica not availableRD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE (integer)Broker: Message size too largeRD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH (integer)Broker: StaleControllerEpochCodeRD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE (integer)Broker: Offset metadata string too largeRD_KAFKA_RESP_ERR_NETWORK_EXCEPTION (integer)Broker: Broker disconnected before response receivedRD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS (integer)Broker: Group coordinator load in progressRD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE (integer)Broker: Group coordinator not availableRD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP (integer)Broker: Not coordinator for groupRD_KAFKA_RESP_ERR_TOPIC_EXCEPTION (integer)Broker: Invalid topicRD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE (integer)Broker: Message batch larger than configured server segment sizeRD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS (integer)Broker: Not enough in-sync replicasRD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND (integer)Broker: Message(s) written to insufficient number of in-sync replicasRD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS (integer)Broker: Invalid required acks valueRD_KAFKA_RESP_ERR_ILLEGAL_GENERATION (integer)Broker: Specified group generation id is not validRD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL (integer)Broker: Inconsistent group protocolRD_KAFKA_RESP_ERR_INVALID_GROUP_ID (integer)Broker: Invalid group.idRD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID (integer)Broker: Unknown memberRD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT (integer)Broker: Invalid session timeoutRD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS (integer)Broker: Group rebalance in progressRD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE (integer)Broker: Commit offset data size is not validRD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED (integer)Broker: Topic authorization failedRD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED (integer)Broker: Group authorization failedRD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED (integer)Broker: Cluster authorization failedRD_KAFKA_CONF_UNKNOWN (integer)RD_KAFKA_CONF_INVALID (integer)RD_KAFKA_CONF_OK (integer)RD_KAFKA_MSG_PARTITIONER_RANDOM (integer)The random partitioner. This was the default partitioner in librdkafka 0.8. Assigns partition randomly.RD_KAFKA_MSG_PARTITIONER_CONSISTENT (integer)The consistent partitioner. Uses consistent hashing to map identical keys onto identical partitions. Uses CRC32 as hashing function. Messages with no key or empty key are always assigned to the same partition.RD_KAFKA_LOG_PRINT (integer)The print logger. Prints messages to stderr.RD_KAFKA_LOG_SYSLOG (integer)The syslog logger. Sends messages to syslog.RD_KAFKA_LOG_SYSLOG_PRINT (integer)The syslog-print partitioner. Sends messages to syslog and prints them to stderr.

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。