19 package org.sleuthkit.autopsy.events;
21 import java.net.URISyntaxException;
22 import java.util.logging.Level;
23 import javax.annotation.concurrent.GuardedBy;
24 import javax.annotation.concurrent.ThreadSafe;
25 import javax.jms.Connection;
26 import javax.jms.DeliveryMode;
27 import javax.jms.JMSException;
28 import javax.jms.Message;
29 import javax.jms.MessageConsumer;
30 import javax.jms.MessageListener;
31 import javax.jms.MessageProducer;
32 import javax.jms.ObjectMessage;
33 import javax.jms.Session;
34 import javax.jms.Topic;
35 import org.apache.activemq.ActiveMQConnectionFactory;
46 final class RemoteEventPublisher {
48 private static final Logger logger = Logger.
getLogger(RemoteEventPublisher.class.getName());
49 private static final String ALL_MESSAGE_SELECTOR =
"All";
50 private final LocalEventPublisher localPublisher;
52 private final Connection connection;
54 private final Session session;
56 private final MessageProducer producer;
58 private final MessageConsumer consumer;
75 RemoteEventPublisher(String eventChannelName, LocalEventPublisher localPublisher, MessageServiceConnectionInfo info) throws URISyntaxException, JMSException {
77 this.localPublisher = localPublisher;
78 ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(info.getUserName(), info.getPassword(), info.getURI());
79 connectionFactory.setTrustAllPackages(
true);
80 connection = connectionFactory.createConnection();
82 session = connection.createSession(
false, Session.AUTO_ACKNOWLEDGE);
83 Topic topic = session.createTopic(eventChannelName);
84 producer = session.createProducer(topic);
85 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
86 consumer = session.createConsumer(topic,
"events = '" + ALL_MESSAGE_SELECTOR +
"'",
true);
87 MessageReceiver receiver =
new MessageReceiver();
88 consumer.setMessageListener(receiver);
89 }
catch (URISyntaxException | JMSException ex) {
90 logger.log(Level.SEVERE,
"Failed to connect to event channel", ex);
93 }
catch (JMSException ignored) {
109 synchronized void stop() throws JMSException {
110 if (null != producer) {
113 if (null != consumer) {
116 if (null != session) {
119 if (null != connection) {
129 synchronized void publish(AutopsyEvent event)
throws JMSException {
130 ObjectMessage message = session.createObjectMessage();
131 message.setStringProperty(
"events", ALL_MESSAGE_SELECTOR);
132 message.setObject(event);
133 producer.send(message);
151 if (message instanceof ObjectMessage) {
152 ObjectMessage objectMessage = (ObjectMessage) message;
153 Object
object = objectMessage.getObject();
155 AutopsyEvent
event = (AutopsyEvent)
object;
156 event.setSourceType(AutopsyEvent.SourceType.REMOTE);
157 localPublisher.publish(event);
160 }
catch (JMSException ex) {
161 logger.log(Level.SEVERE,
"Error receiving message", ex);
162 }
catch (Throwable ex) {
164 logger.log(Level.SEVERE,
"Unexpected error receiving message", ex);
void onMessage(Message message)
synchronized static Logger getLogger(String name)