Automatically Updating Timestamp Fields for Axon State-Stored Aggregates


A state-stored aggregate in the Axon framework is an aggregate which also gets persisted as a classic JPA entity in its own table. Usually this is used in conjunction with event sourcing so the database will contain all the historic changes (events) to the entity as well as a relational representation of its current state. This means we have a separate table for storing the events, and a separate table which maps to the JPA entity.

The problem we face here is is synchronizing event timestamps. Other times we’ll have timestamp fields such as created and updated on our entity which are automatically handled for us. This is easily handled in JPA with the @PrePersist and @PreUpdate annotations. However, these annotations are not processed by Axon JPA repositories and even if they were, if we calculated the timestamp here it would be different from the event timestamp. Luckily, the Axon framework provides us with the tools we need to implement our own solution.

Let’s say that we have a state-stored aggregate (a class with the @Aggregate and @Entity annotations) with the following fields to capture the creation and update times: private LocalDateTime created and private LocalDateTime updated. The solution will be to implement the Axon provided HandlerEnhancerDefinition interface. This acts as a wrapper for event handler methods which we can use to persist the timestamp included in the meta data of the event. Here is a working example of an implementation that will automatically update the created and updated LocalDateTimefields.

@Component
@Slf4j
public class EventSourcingHandlerEnhancerDefinition implements HandlerEnhancerDefinition {

	@Override
	public  MessageHandlingMember wrapHandler(MessageHandlingMember original) {
		return original.attribute("EventSourcingHandler.payloadType")
				.map(attr -> (MessageHandlingMember)
						new EventSourcingMessageHandlingMember<>(original))
				.orElse(original);
	}

	private static class EventSourcingMessageHandlingMember
			extends WrappedMessageHandlingMember {

		private final MessageHandlingMember original;

		private static final String[] TIMESTAMP_FIELDS = {"updated", "created"};

		private EventSourcingMessageHandlingMember(
				MessageHandlingMember delegate) {
			super(delegate);
			original = delegate;
		}

		@Override
		public Object handle(Message<?> message, T target) throws Exception {
			Object timestampObj = ((EventMessage<?>) message).getTimestamp();

			if (timestampObj != null) {
				Instant instant = (Instant) timestampObj;

				for (String timestampFieldName : TIMESTAMP_FIELDS) {
					this.updateTimestampFields(timestampFieldName, target, instant);
				}

			}

			if (this.original.canHandle(message)) {
				return this.original.handle(message, target);
			}

			return this;
		}

		private boolean shouldSkipCreatedTimestampField(final String timestampField, final T target)
				throws IllegalAccessException {

			if ("created".equals(timestampField)) {
				Optional methodOptional = this.getMethodOptional("getCreated", target);

				if (methodOptional.isPresent()) {
					Method method = methodOptional.get();

					try {
						Object result = method.invoke(target);
						return result != null;
					} catch (InvocationTargetException e) {
						log.trace(e.getMessage(), e);
						return true;
					}
				}
			}

			return false;
		}

		private Optional getMethodOptional(final String getterName, final T target) {
			return Arrays.stream(target.getClass().getDeclaredMethods())
					.filter(m -> getterName.equals(m.getName()))
					.findFirst();
		}

		private void updateTimestampFields(final String timestampFieldName, final T target, final Instant instant) {

			try {

				if (shouldSkipCreatedTimestampField(timestampFieldName, target)) {
					return;
				}

				String getterName = new StringBuilder()
						.append("set")
						.append(timestampFieldName.substring(0, 1).toUpperCase())
						.append(timestampFieldName.substring(1))
						.toString();

				Optional methodOptional =
						this.getMethodOptional(getterName, target);

				if (methodOptional.isPresent()) {
					Method method = methodOptional.get();
					method.invoke(target, LocalDateTime.ofInstant(instant, ZoneOffset.UTC));
				}

			} catch (InvocationTargetException | IllegalAccessException e) {
				log.trace("Could not update field {} on Aggregate type {}",
						timestampFieldName,
						target.getClass().getName(),
						e);
			}
		}

	}
}

The first method wrapHandler specifies the type of event handler we wish to wrap. In our case, we want aggregate methods with @EventSourcingHandler since they handle the event as well as the JPA persistence.

Next we have a static class which specifies the behavior of our wrapper. MessageHandlingMember original is the original behavior defined by Axon, so we need to keep this reference and call it after our custom implementation runs.

String[] TIMESTAMP_FIELDS are obviously the names of the fields we are updating. You can set your field names here if their names differ.

The overriden Objet handle(Message<?> message, T target) method is where our custom behavior is defined. It grabs the timestamp from the event as an Instant. Then in the helper methods, reflections are used to grab the created and updated fields and will set the updated field to match the event timestamp, and set the created field only if this is a new entity being persisted. At the end of our handle() implementation we call Axon’s implementation so it can continue to persist the event and the entity. (Please note that Java bean style getter and setter methods must exist on the entity for the reflections here to work).

After adding this implementation and running some tests, we should be able to execute create and update operations on our entities and see that the timestamp columns in the entity table match the timestamp of the events on the domain_event_entry table of the even store.

That’s all there is to it. I hope this helps!

Leave a Reply

Your email address will not be published. Required fields are marked *