2

我的用例是调用查询以从具有不同输入参数的 db 中获取记录。获取记录后,进行一些处理,最后写入文件。我的输入参数值取决于前面查询的完整处理。我的问题是,我如何在 spout 中知道先前查询的处理已完成,即记录已成功写入文件。

我尝试实施ITridentSpout但仍然没有得到任何解决方案。以下是我的代码ITridentSpout

TridentCoordinator.java

package com.TransactionlTopology;

import java.util.concurrent.ConcurrentHashMap;

import storm.trident.spout.ITridentSpout;

public class TridentCoordinator implements ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>>{

    ConcurrentHashMap<Long,String> prevMetadata=new ConcurrentHashMap<Long, String>();
    boolean result=true;

    @Override
    public void success(long txid) {
        System.out.println("inside success mehod with txid as  "+txid);
        if(prevMetadata.containsKey(txid)){
            prevMetadata.replace(txid, "SUCCESS");
        }
    }

    @Override
    public boolean isReady(long txid) {
        if(!prevMetadata.isEmpty()){
            result=true;
        for(Long txId:prevMetadata.keySet()){
            System.out.println("txId:---- "+txId +"    value"+prevMetadata.get(txId) );
            if(prevMetadata.get(txId).equalsIgnoreCase("SUCESS")){
                prevMetadata.put(txid, "STARTED");
                result= true;
            }
        }
        }
        else{
            prevMetadata.put(txid, "STARTED");
            result= true;
        }

        System.out.println("inside isReady function with txid as:---- "+txid+"result value:--  "+result);

        return result;
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

    @Override
    public ConcurrentHashMap<Long,String> initializeTransaction(long txid, ConcurrentHashMap<Long,String> prevMetadata, ConcurrentHashMap<Long,String> currMetadata) {
        System.out.println("inside initialize transaction method with values as:----- "+txid+"   "+prevMetadata+"   "+currMetadata);

        return prevMetadata;
    }
}

TridentEmitterImpl.java

package com.TransactionlTopology;

import java.util.concurrent.ConcurrentHashMap;

import storm.trident.operation.TridentCollector;
import storm.trident.spout.ITridentSpout;
import storm.trident.topology.TransactionAttempt;
import backtype.storm.tuple.Values;

public class TridentEmitterImpl implements ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> {

    @Override
    public void emitBatch(TransactionAttempt tx, ConcurrentHashMap<Long,String> coordinatorMeta,TridentCollector collector) {
        System.out.println("inside emitbatch of emitter class with values as:--- "+coordinatorMeta);
        System.out.println("tx.getAttemptId()   "+tx.getAttemptId()+"tx.getTransactionId()  "+tx.getTransactionId()+"tx.getId()  "+tx.getId().toString());
        collector.emit(new Values("preeti"));
    }

    @Override
    public void success(TransactionAttempt tx) {
        System.out.println("inside success of emitter with tx id as   "+tx.getTransactionId());

    }

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }
}

TridentSpoutImpl.java

package com.TransactionlTopology;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import storm.trident.spout.ITridentSpout;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;

public class TridentSpoutImpl implements ITridentSpout<ConcurrentHashMap<Long,String>> {

    @Override
    public storm.trident.spout.ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>> getCoordinator(String txStateId, Map conf, TopologyContext context) {

        return new TridentCoordinator();
    }

    @Override
    public storm.trident.spout.ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> getEmitter(String txStateId, Map conf, TopologyContext context) {

        return new TridentEmitterImpl();
    }

    @Override
    public Map getComponentConfiguration() {

        Map<String,String> newMap=new HashMap<String, String>();
        newMap.put("words","preeti");
        return newMap;
    }

    @Override
    public Fields getOutputFields() {

        return new Fields("word");
    }

}

也无法理解和会initializeTransaction出现什么值。请提供一些解决方案prevMetaDatacurMetada

4

1 回答 1

1

您有多种选择。不过,也许最简单的方法是在拓扑中添加最后一个螺栓,在写入文件后,通知 spout 通过您的 spout 可以观察的消息队列开始下一个查询是好的。当 spout 接收到这个通知时,它就可以处理下一个查询。

然而,更一般地说,这似乎是 Storm 的一个有问题的用例。您的拓扑的许多资源可能会在很多时候处于空闲状态,因为您一次只有一个事务通过它运行。显然我不知道你的问题的所有细节,但是事务之间的这种依赖限制了使用 Storm 增加的复杂性的价值。

于 2013-12-20T17:47:49.283 回答