0

I am implementing JGroups with AxonFramework and I am referring to this link. I made some changes in the code and running the project without Docker. Following is my code -

Main Class -

public class ClusterRunner {

   public static void main(String[] args) {

      Thread t1 = new Thread(new PrimaryNode());
      Thread t2 = new Thread(new SecondaryNode());

      t1.start();
      t2.start();
   }
}

Primary Node -

import org.axonframework.commandhandling.AggregateAnnotationCommandHandler;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.distributed.AnnotationRoutingStrategy;
import org.axonframework.commandhandling.distributed.DistributedCommandBus;
import org.axonframework.commandhandling.distributed.commandfilter.AcceptAll;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.commandhandling.gateway.DefaultCommandGateway;
import org.axonframework.commandhandling.model.Repository;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.axonframework.jgroups.commandhandling.JGroupsConnector;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.jgroups.JChannel;

public class PrimaryNode implements Runnable {

    private JGroupsConnector connector;

    private CommandGateway commandGateway;

    private EventStore eventStore;

    private CommandBus commandBus;

    public PrimaryNode() {

        eventStore = new EmbeddedEventStore(new InMemoryEventStorageEngine());

        try {

            commandBus = configureDistributedCommandBus();

        } catch (Exception e) {

            e.printStackTrace();
        }

        Repository<Item> repository = new EventSourcingRepository<>(Item.class, eventStore);

        new AggregateAnnotationCommandHandler<>(Item.class, repository).subscribe(commandBus);

        commandGateway = new DefaultCommandGateway(commandBus);
    }

    public void run() {

        for (int a = 0; a < 5; a++) {

            System.out.println("Primary Node Created item " + a + " id: " + System.currentTimeMillis());
            commandGateway.sendAndWait(new CreateItem(Long.toString(a), Long.toString(System.currentTimeMillis())));
        }
    }

    private CommandBus configureDistributedCommandBus() throws Exception {

        CommandBus commandBus = new SimpleCommandBus();

        JChannel channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp.xml"));

        connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(),
            new AnnotationRoutingStrategy());
        connector.updateMembership(100, AcceptAll.INSTANCE);

        connector.connect();
        connector.awaitJoined();

        return new DistributedCommandBus(connector, connector);
    }
}

Seconday Node -

import org.axonframework.commandhandling.AggregateAnnotationCommandHandler;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.distributed.AnnotationRoutingStrategy;
import org.axonframework.commandhandling.distributed.DistributedCommandBus;
import org.axonframework.commandhandling.distributed.commandfilter.AcceptAll;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.commandhandling.gateway.DefaultCommandGateway;
import org.axonframework.commandhandling.model.Repository;
import org.axonframework.eventhandling.EventListener;
import org.axonframework.eventhandling.SimpleEventHandlerInvoker;
import org.axonframework.eventhandling.SubscribingEventProcessor;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.axonframework.jgroups.commandhandling.JGroupsConnector;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.jgroups.JChannel;

public class SecondaryNode implements Runnable {

    private JGroupsConnector connector;

    private EventStore eventStore;

    public SecondaryNode() {

        eventStore = new EmbeddedEventStore(new InMemoryEventStorageEngine());

        CommandBus commandBus = null;

        try {
            commandBus = configureDistributedCommandBus();
        } catch (Exception e) {
            e.printStackTrace();
        }

        Repository<Item> repository = new EventSourcingRepository<>(Item.class, eventStore);

        new AggregateAnnotationCommandHandler<>(Item.class, repository).subscribe(commandBus);

        @SuppressWarnings("unused")
        CommandGateway commandGateway = new DefaultCommandGateway(commandBus);
    }

    public void run() {

        new SubscribingEventProcessor("processor", new SimpleEventHandlerInvoker((EventListener) event -> {

            System.out.println("Secondary Node -- " + event.getPayload());
        }), eventStore).start();
    }

    private CommandBus configureDistributedCommandBus() throws Exception {

        CommandBus commandBus = new SimpleCommandBus();

        JChannel channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp_test.xml"));

        connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(),
                new AnnotationRoutingStrategy());
        connector.updateMembership(100, AcceptAll.INSTANCE);

        connector.connect();
        connector.awaitJoined();

        return new DistributedCommandBus(connector, connector);
    }
}

Item -

import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.TargetAggregateIdentifier;
import org.axonframework.commandhandling.model.AggregateIdentifier;
import org.axonframework.eventhandling.EventHandler;

import static org.axonframework.commandhandling.model.AggregateLifecycle.apply;

class CreateItem {

    @TargetAggregateIdentifier
    private final String itemId;
    private final String name;

    public CreateItem(String itemId, String naam) {
        this.itemId = itemId;
        this.name = naam;
    }

    public String getItemId() {
        return itemId;
    }

    public String getName() {
        return name;
    }
}

class ItemCreated {
    private final String itemId;
    private final String name;

    public ItemCreated(String itemId, String naam) {
        this.itemId = itemId;
        this.name = naam;
    }

    public String getItemId() {
        return itemId;
    }

    public String getName() {
        return name;
    }

    @Override
    public String toString() {

        return itemId + "    " + name;
    }
}

class Item {
    @AggregateIdentifier
    private String itemId;
    private String name;

    public Item() {

    }

    @CommandHandler
    public Item(CreateItem createItem) {
        apply(new ItemCreated(createItem.getItemId(), createItem.getName()));
    }

    @EventHandler
    public void itemCreated(ItemCreated itemCreated) {
        itemId = itemCreated.getItemId();
        name = itemCreated.getName();
    }
}

Now my problem is, when I run main class, primary node produces 5 events but the secondary node is not getting all the events. It may get 2 or 3 or 4 events but not all. I want all of the events to be delivered to the secondary node. I am very new to AxonFramework and JGroups. Please help me understand what is the problem here.

4

2 回答 2

1

By default, Axon will subscribe each of your Event Handlers to the Event Bus (in your case the EmbeddedEventStore). That means a handler is invoked when that specific local instance publishes an Event. And that Event is published when handling a Command. So essentially, the Event Handlers are invoked on the Node that handles the command.

Alternatively, you can configure your event handlers to run in "tracking" mode. In that case, they would open a connection to the Event Store. In that case, depending on the exact configuration, each node can pick up its own copy of the event, regardless of where it was published.

于 2017-04-27T14:20:28.550 回答
0

So after trying everything, I decided to experiment with routing strategy. I decided to use AbstractRoutingStrategy which basically helps in decision making for the command messages which don't have a decisive destination. Following is the working code that is in the Primary node (sender) of JGroup. Modify the configureDistributedCommandBus() method from PrimaryNode class as -

private CommandBus configureDistributedCommandBus() throws Exception {

    CommandBus commandBus = new SimpleCommandBus();

    channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp.xml"));

    RoutingStrategy rs = new AbstractRoutingStrategy(UnresolvedRoutingKeyPolicy.STATIC_KEY) {

        @Override
        protected String doResolveRoutingKey(CommandMessage<?> cmdMsg) {

            View view = channel.getView();

            if (view.getMembers().size() == 2) {

                return "secondary";

            } else if (view.getMembers().size() == 1) {

            }

            return cmdMsg.getIdentifier();
        }
    };

    connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(), rs);
    connector.updateMembership(100, AcceptAll.INSTANCE);

    connector.connect();
    connector.awaitJoined();

    return new DistributedCommandBus(connector, connector);
}

Since I am using JGroups, I can get the view of cluster, i.e. how many nodes are there. On that basis I will take decision of the command message routing.

于 2017-04-27T04:24:25.023 回答