diff --git a/BuiltinKeys.h b/BuiltinKeys.h deleted file mode 100644 index 44f90d4..0000000 --- a/BuiltinKeys.h +++ /dev/null @@ -1,262 +0,0 @@ -#ifndef OMS_BUILTIN_KEYS_H -#define OMS_BUILTIN_KEYS_H - -#include -#include "Namespace.h" - -BEGIN_NAMESPACE_2(io, openmessaging) - - /** - * The {@code MESSAGE_ID} header field contains a value that uniquely identifies - * each message sent by a {@code Producer}. - *

- * When a message is sent, MESSAGE_ID is assigned by the producer. - */ - static const std::string MESSAGE_ID = "MESSAGE_ID"; - - /** - * The {@code DESTINATION} header field contains the destination to which the message is being sent. - *

- * When a message is sent this value is set to the right {@code Queue}, then the message will be sent to - * the specified destination. - *

- * When a message is received, its destination is equivalent to the {@code Queue} where the message resides in. - */ - static const std::string DESTINATION = "DESTINATION"; - - /** - * The {@code RECEIPT_HANDLE} header field contains an identifier associated with the act of receiving a message. - *

- * A new receipt handle will be set properly every time a message is received, and the consumer could use - * it to acknowledge the consumed message. - */ - static const std::string RECEIPT_HANDLE = "RECEIPT_HANDLE"; - - /** - * The {@code BORN_TIMESTAMP} header field contains the time a message was handed - * off to a {@code Producer} to be sent. - *

- * When a message is sent, BORN_TIMESTAMP will be set with current timestamp as the born - * timestamp of a message in client side, on return from the send method, the message's - * BORN_TIMESTAMP header field contains this value. - *

- * When a message is received its, BORN_TIMESTAMP header field contains this same value. - *

- * This filed is a {@code long} value, measured in milliseconds. - */ - static const std::string BORN_TIMESTAMP = "BORN_TIMESTAMP"; - - /** - * The {@code BORN_HOST} header field contains the born host info of a message in client side. - *

- * When a message is sent, BORN_HOST will be set with the local host info, - * on return from the send method, the message's BORN_HOST header field contains this value. - *

- * When a message is received, its BORN_HOST header field contains this same value. - */ - static const std::string BORN_HOST = "BORN_HOST"; - - /** - * The {@code STORE_TIMESTAMP} header field contains the store timestamp of a message in server side. - *

- * When a message is sent, STORE_TIMESTAMP is ignored. - *

- * When the send method returns it contains a server-assigned value. - *

- * This filed is a {@code long} value, measured in milliseconds. - */ - static const std::string STORE_TIMESTAMP = "STORE_TIMESTAMP"; - - /** - * The {@code STORE_HOST} header field contains the store host info of a message in server side. - *

- * When a message is sent, STORE_HOST is ignored. - *

- * When the send method returns it contains a server-assigned value. - */ - static const std::string STORE_HOST = "STORE_HOST"; - - /** - * The {@code START_TIME} header field contains the startup timestamp that a message - * can be delivered to consumer client. - *

- * If START_TIME field isn't set explicitly, use BORN_TIMESTAMP as the startup timestamp. - *

- * This filed is a {@code long} value, measured in milliseconds. - */ - static const std::string START_TIME = "START_TIME"; - - /** - * The {@code STOP_TIME} header field contains the stop timestamp that a message - * should be discarded after this timestamp, and no consumer can consume this message. - *

- * {@code (START_TIME ~ STOP_TIME)} represents an absolute valid interval that a message - * can be delivered in it. - *

- * If an earlier timestamp is set than START_TIME, that means the message does not expire. - *

- * This filed is a {@code long} value, measured in milliseconds. - *

- * When an undelivered message's expiration time is reached, the message should be destroyed. - * OMS does not define a notification of message expiration. - */ - static const std::string STOP_TIME = "STOP_TIME"; - - /** - * The {@code TIMEOUT} header field contains the expiration time, it represents a - * time-to-live value. - *

- * {@code (BORN_TIMESTAMP ~ BORN_TIMESTAMP + TIMEOUT)} represents a relative valid interval - * that a message can be delivered in it. - * If the TIMEOUT field is specified as zero, that indicates the message does not expire. - *

- * The TIMEOUT header field has higher priority than START_TIME/STOP_TIME header fields. - *

- * When an undelivered message's expiration time is reached, the message should be destroyed. - * OMS does not define a notification of message expiration. - */ - static const std::string TIMEOUT = "TIMEOUT"; - - /** - * The {@code PRIORITY} header field contains the priority level of a message, - * a message with a higher priority value should be delivered preferentially. - *

- * OMS defines a ten level priority value with 1 as the lowest priority and 10 as the highest, - * and the default priority is 5. The priority beyond this region will be ignored. - *

- * OMS does not require or provide any guarantee that the message should be delivered - * in priority order strictly, but the vendor should provide a best effort to - * deliver expedited messages ahead of normal messages. - *

- * If PRIORITY field isn't set explicitly, use {@code 5} as the default priority. - */ - static const std::string PRIORITY = "PRIORITY"; - - /** - * The {@code RELIABILITY} header field contains the reliability level of a message, the vendor - * should guarantee the reliability level for a message. - *

- * OMS defines two modes of message delivery: - *

- */ - static const std::string RELIABILITY = "RELIABILITY"; - - /** - * The {@code SEARCH_KEYS} header field contains the multiple search keys of a message. - *

- * The keyword indexes will be built by the search keys, users can query similar - * messages through these indexes and have a quick response. - *

- * This field is a {@code String} value, the different search keys are joined - * together with a comma delimiter. - *

- * OMS defines that a message at most has five search keys. - */ - static const std::string SEARCH_KEYS = "SEARCH_KEYS"; - - /** - * The {@code SCHEDULE_EXPRESSION} header field contains schedule expression of a message. - *

- * The message will be delivered by the specified SCHEDULE_EXPRESSION, which is a CRON expression. - * - * @see https://en.wikipedia.org/wiki/Cron#CRON_expression - */ - static const std::string SCHEDULE_EXPRESSION = "SCHEDULE_EXPRESSION"; - - /** - * The {@code TRACE_ID} header field contains the trace ID of a message, which represents a global and unique - * identification, to associate key events in the whole lifecycle of a message, - * like sent by who, stored at where, and received by who. - *

- * And, the messaging system only plays exchange role in a distributed system in most cases, - * so the TraceID can be used to trace the whole call link with other parts in the whole system. - */ - static const std::string TRACE_ID = "TRACE_ID"; - - /** - * The {@code STREAM_KEY} header field contains the stream key of a message. - * The messages with same stream key should be dispatched to the same stream of the queue. - */ - static const std::string STREAM_KEY = "STREAM_KEY"; - - /** - * The {@code REDELIVERED_NUMBER} header field contains a number, which represents - * the number of message delivery. - */ - static const std::string REDELIVERED_NUMBER = "REDELIVERED_NUMBER"; - - /** - * The {@code REDELIVERED_REASON} header field contains the text description of the reason that causes - * the last message delivery retry. - */ - static const std::string REDELIVERED_REASON = "REDELIVERED_REASON"; - - /** - * The {@code DRIVER_IMPL} key represents the vendor implementation - * entry of {@link MessagingAccessPoint}. - */ - static const std::string DRIVER_IMPL = "DRIVER_IMPL"; - - /** - * The {@code ACCESS_POINTS} key shows the specified access points in OMS driver schema. - * @see Access Point Schema - */ - static const std::string ACCESS_POINTS = "ACCESS_POINTS"; - - /** - * The {@code ACCOUNT_ID} key shows the specified account info in OMS driver schema. - */ - static const std::string ACCOUNT_ID = "ACCOUNT_ID"; - - /** - * The {@code REGION} key shows the specified region in OMS driver schema. - */ - static const std::string REGION = "REGION"; - - /** - * The {@code PRODUCER_ID} key represents the the unique producer id of a producer instance. - */ - static const std::string PRODUCER_ID = "PRODUCER_ID"; - - /** - * The {@code CONSUMER_ID} key represents the the unique consumer id of a consumer instance. - */ - static const std::string CONSUMER_ID = "CONSUMER_ID"; - - /** - * The {@code OPERATION_TIMEOUT} key defines the timeout of almost all the method calls in OMS. - */ - static const std::string OPERATION_TIMEOUT = "OPERATION_TIMEOUT"; - - /** - * The {@code ROUTING_SOURCE} key shows the source queue of a {@code Routing} instance. - *

- * The {@code Routing} consists of a triple, include source queue, destination queue and expression. - */ - static const std::string ROUTING_SOURCE = "ROUTING_SOURCE"; - - /** - * The {@code ROUTING_DESTINATION} key shows the destination queue of a {@code Routing} instance. - *

- * The {@code Routing} consists of a triple, include source queue, destination queue and expression. - */ - static const std::string ROUTING_DESTINATION = "ROUTING_DESTINATION"; - - /** - * The {@code ROUTING_EXPRESSION} key shows the expression of a {@code Routing} instance. - *

- * The {@code Routing} consists of a triple, include source queue, destination queue and expression. - */ - static const std::string ROUTING_EXPRESSION = "ROUTING_EXPRESSION"; - -END_NAMESPACE_2(io, openmessaging) -#endif // OMS_BUILTIN_KEYS_H diff --git a/ByteMessage.h b/ByteMessage.h index bc3dcea..77acc34 100644 --- a/ByteMessage.h +++ b/ByteMessage.h @@ -1,3 +1,19 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #ifndef OMS_BYTE_MESSAGE_H #define OMS_BYTE_MESSAGE_H diff --git a/Future.h b/Future.h index 0c10441..b33c884 100644 --- a/Future.h +++ b/Future.h @@ -1,3 +1,19 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #ifndef OMS_FUTURE_H #define OMS_FUTURE_H @@ -24,14 +40,45 @@ BEGIN_NAMESPACE_2(io, openmessaging) } + /** + * Returns {@code true} if this task was cancelled before it completed normally. + * + * @return {@code true} if this task was cancelled before it completed + */ virtual bool isCancelled() = 0; + /** + * Returns {@code true} if this task completed. + *

+ * Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method + * will return {@code true}. + * + * @return {@code true} if this task completed + */ virtual bool isDone() = 0; + /** + * Waits if necessary for at most the given time for the computation to complete, and then retrieves its result, if + * available. + * + * @param timeout the maximum time to wait + * @return the computed result

if the computation was cancelled + */ virtual producer::SendResultPtr get(unsigned long timeout = LONG_MAX) = 0; + /** + * Adds the specified listener to this future. The specified listener is notified when this future is done. If this + * future is already completed, the specified listener will be notified immediately. + * + * @param listener FutureListener + */ virtual Future &addListener(FutureListenerPtr listener) = 0; + /** + * Returns the cause of the failed future + * + * @return the cause of the failure. {@code null} if succeeded or this future is not completed yet. + */ virtual std::exception &getThrowable() = 0; }; diff --git a/FutureListener.h b/FutureListener.h index 8ef7c28..ee52a28 100644 --- a/FutureListener.h +++ b/FutureListener.h @@ -1,3 +1,19 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #ifndef OMS_FUTURE_LISTENER_H #define OMS_FUTURE_LISTENER_H @@ -15,6 +31,10 @@ BEGIN_NAMESPACE_2(io, openmessaging) } + /** + * Invoked when the operation completes, be the associated {@link Promise} successful or not. + * @param future The associated promise facade + */ virtual void operationComplete(const Future& future) = 0; }; diff --git a/Headers.h b/Headers.h new file mode 100644 index 0000000..cda9f3d --- /dev/null +++ b/Headers.h @@ -0,0 +1,335 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OMS_HEADERS_H +#define OMS_HEADERS_H + +#include + +#include "smart_pointer.h" +#include "Namespace.h" +#include "Uncopyable.h" + +BEGIN_NAMESPACE_2(io, openmessaging) + + /** + * The {@code Message} interface is the root interface of all OMS messages, and the most commonly used OMS message is + * {@link BytesMessage}. + *

+ * Most message-oriented middleware (MOM) products treat messages as lightweight entities that consist of a header and a + * body, like Apache RocketMQ. The header part contains fields used for message + * routing and identification; the body part contains the application data to sent. + *

+ * The {@code Message} is a lightweight entity that only contains properties related to information of a specific message + * object, and the {@code Message} is composed of the following parts: + * + *

+ * + * The body part is deferred to the sub-classes of {@code Message}. + * + * @version OMS 1.0 + * @since OMS 1.0 + */ + class Headers : private Uncopyable { + public: + virtual ~Headers() { + + } + + /** + * The {@code DESTINATION} header field contains the destination to which the message is being sent. + *

+ * When a message is sent this value is set to the right {@code Queue}, then the message will be sent to the + * specified destination. + *

+ * When a message is received, its destination is equivalent to the {@code Queue} where the message resides in. + */ + virtual Headers &setDestination(const std::string &destination) = 0; + + /** + * The {@code MESSAGE_ID} header field contains a value that uniquely identifies each message sent by a {@code + * Producer}. + */ + virtual Headers &setMessageId(const std::string &messageId) = 0; + + /** + * The {@code BORN_TIMESTAMP} header field contains the time a message was handed off to a {@code Producer} to + * be sent. + *

+ * When a message is sent, BORN_TIMESTAMP will be set with current timestamp as the born timestamp of a message + * in client side, on return from the send method, the message's BORN_TIMESTAMP header field contains this + * value. + *

+ * When a message is received its, BORN_TIMESTAMP header field contains this same value. + *

+ * This filed is a {@code long} value, measured in milliseconds. + */ + virtual Headers &setBornTimestamp(long bornTimestamp) = 0; + + /** + * The {@code BORN_HOST} header field contains the born host info of a message in client side. + *

+ * When a message is sent, BORN_HOST will be set with the local host info, on return from the send method, the + * message's BORN_HOST header field contains this value. + *

+ * When a message is received, its BORN_HOST header field contains this same value. + */ + virtual Headers &setBornHost(const std::string &bornHost) = 0; + + /** + * The {@code STORE_TIMESTAMP} header field contains the store timestamp of a message in server side. + *

+ * When a message is sent, STORE_TIMESTAMP is ignored. + *

+ * When the send method returns it contains a server-assigned value. + *

+ * This filed is a {@code long} value, measured in milliseconds. + */ + virtual Headers &setStoreTimestamp(long storeTimestamp) = 0; + + /** + * The {@code STORE_HOST} header field contains the store host info of a message in server side. + *

+ * When a message is sent, STORE_HOST is ignored. + *

+ * When the send method returns it contains a server-assigned value. + */ + virtual Headers &setStoreHost(const std::string &storeHost) = 0; + + /** + * The {@code DELAY_TIME} header field contains a number that represents the delayed times in milliseconds. + *

+ * The message will be delivered after delayTime milliseconds starting from {@CODE BORN_TIMESTAMP} . When this + * filed isn't set explicitly, this means this message should be delivered immediately. + */ + virtual Headers &setDelayTime(long delayTime) = 0; + + /** + * The {@code EXPIRE_TIME} header field contains the expiration time, it represents a time-to-live value. + *

+ * The {@code EXPIRE_TIME} represents a relative valid interval that a message can be delivered in it. If the + * EXPIRE_TIME field is specified as zero, that indicates the message does not expire. + *

+ *

+ * When an undelivered message's expiration time is reached, the message should be destroyed. OMS does not + * define a notification of message expiration. + *

+ */ + virtual Headers &setExpireTime(long expireTime) = 0; + + /** + * The {@code PRIORITY} header field contains the priority level of a message, a message with a higher priority + * value should be delivered preferentially. + *

+ * OMS defines a ten level priority value with 1 as the lowest priority and 10 as the highest, and the default + * priority is 5. The priority beyond this region will be ignored. + *

+ * OMS does not require or provide any guarantee that the message should be delivered in priority order + * strictly, but the vendor should provide a best effort to deliver expedited messages ahead of normal + * messages. + *

+ * If PRIORITY field isn't set explicitly, use {@code 5} as the default priority. + */ + virtual Headers &setPriority(short priority) = 0; + + /** + * The {@code RELIABILITY} header field contains the reliability level of a message, the vendor should guarantee + * the reliability level for a message. + *

+ * OMS defines two modes of message delivery: + *

+ */ + virtual Headers &setDurability(short durability) = 0; + + /** + * The {@code messagekey} header field contains the custom key of a message. + *

+ * This key is a customer identifier for a class of messages, and this key may be used for server to hash or + * dispatch messages, or even can use this key to implement order message. + *

+ */ + virtual Headers &setMessageKey(const std::string &messageKey) = 0; + + /** + * The {@code TRACE_ID} header field contains the trace ID of a message, which represents a global and unique + * identification, to associate key events in the whole lifecycle of a message, like sent by who, stored at + * where, and received by who. + *

+ * And, the messaging system only plays exchange role in a distributed system in most cases, so the TraceID can + * be used to trace the whole call link with other parts in the whole system. + */ + virtual Headers &setTraceId(const std::string &traceId) = 0; + + /** + * The {@code DELIVERY_COUNT} header field contains a number, which represents the count of the message + * delivery. + */ + virtual Headers &setDeliveryCount(short deliveryCount) = 0; + + /** + * This field {@code TRANSACTION_ID} is used in transactional message, and it can be used to trace a + * transaction. + *

+ * So the same {@code TRANSACTION_ID} will be appeared not only in prepare message, but also in commit message, + * and consumer received message also contains this field. + */ + virtual Headers &setTransactionId(const std::string &transactionId) = 0; + + /** + * A client can use the {@code CORRELATION_ID} field to link one message with another. A typical use is to link + * a response message with its request message. + */ + virtual Headers &setCorrelationId(const std::string &scorrelationId) = 0; + + /** + * The field {@code COMPRESSION} in headers represents the message body compress algorithm. vendors are free to + * choose the compression algorithm, but must ensure that the decompressed message is delivered to the user. + */ + virtual Headers &setCompression(short compression) = 0; + + /** + * See {@link Headers#setDestination(String)} + * + * @return destination + */ + virtual std::string getDestination() = 0; + + /** + * See {@link Headers#setMessageId(String)} + * + * @return messageId + */ + virtual std::string getMessageId() = 0; + + /** + * See {@link Headers#setBornTimestamp(long)} + * + * @return bornTimestamp + */ + virtual long getBornTimestamp() = 0; + + /** + * See {@link Headers#setBornHost(String)} + * + * @return bornHost + */ + virtual std::string getBornHost() = 0; + + /** + * See {@link Headers#setStoreTimestamp(long)} + * + * @return storeTimestamp + */ + virtual long getStoreTimestamp() = 0; + + /** + * See {@link Headers#setStoreHost(String)} + * + * @return storeHost + */ + virtual std::string getStoreHost() = 0; + + /** + * See {@link Headers#setDelayTime(long)} + * + * @return delayTime + */ + virtual long getDelayTime() = 0; + + /** + * See {@link Headers#setExpireTime(long)} + * + * @return expireTime + */ + virtual long getExpireTime() = 0; + + /** + * See {@link Headers#setPriority(short)} + * + * @return priority + */ + virtual short getPriority() = 0; + + /** + * See {@link Headers#setDurability(short)} + * + * @return durability + */ + virtual short getDurability() = 0; + + /** + * See {@link Headers#setMessageKey(String)} + * + * @return messageKey + */ + virtual std::string getMessageKey() = 0; + + /** + * See {@link Headers#setTraceId(String)} + * + * @return traceId + */ + virtual std::string getTraceId() = 0; + + /** + * See {@link Headers#setDeliveryCount(int)} + * + * @return deliveryCount + */ + virtual short getDeliveryCount() = 0; + + /** + * See {@link Headers#setTransactionId(String)} + * + * @return transactionId + */ + virtual std::string getTransactionId() = 0; + + /** + * See {@link Headers#setCorrelationId(String)} + * + * @return correlationId + */ + virtual std::string getCorrelationId() = 0; + + /** + * See {@link Headers#setCompression(short)} + * + * @return compression + */ + virtual short getCompression() = 0; + + }; + + typedef NS::shared_ptr HeadersPtr; + +END_NAMESPACE_2(io, openmessaging) + +#endif // OMS_HEADERS_H diff --git a/KeyValue.h b/KeyValue.h index 73428b0..45cd252 100644 --- a/KeyValue.h +++ b/KeyValue.h @@ -1,3 +1,19 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #ifndef OMS_KEY_VALUE_H #define OMS_KEY_VALUE_H @@ -33,6 +49,8 @@ BEGIN_NAMESPACE_2(io, openmessaging) } + virtual KeyValue &put(const std::string &key, short value) = 0; + virtual KeyValue &put(const std::string &key, int value) = 0; virtual KeyValue &put(const std::string &key, long value) = 0; @@ -41,6 +59,8 @@ BEGIN_NAMESPACE_2(io, openmessaging) virtual KeyValue &put(const std::string &key, const std::string &value) = 0; + virtual short getShort(const std::string &key, short defaultValue = 0) = 0; + virtual int getInt(const std::string &key, int defaultValue = 0) = 0; virtual long getLong(const std::string &key, long defaultValue = 0L) = 0; diff --git a/Message.h b/Message.h index 1d5e587..1b9572f 100644 --- a/Message.h +++ b/Message.h @@ -1,3 +1,19 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #ifndef OMS_MESSAGE_H #define OMS_MESSAGE_H @@ -5,7 +21,7 @@ #include "smart_pointer.h" #include "KeyValue.h" -#include "BuiltinKeys.h" +#include "OMSBuiltinKeys.h" #include "Namespace.h" #include "Uncopyable.h" @@ -40,26 +56,8 @@ BEGIN_NAMESPACE_2(io, openmessaging) } - virtual KeyValuePtr sysHeaders() = 0; - - virtual KeyValuePtr userHeaders() = 0; - - virtual Message& putSysHeaders(const std::string &key, int value) = 0; - - virtual Message& putSysHeaders(const std::string &key, long value) = 0; - - virtual Message& putSysHeaders(const std::string &key, double value) = 0; - - virtual Message& putSysHeaders(const std::string &key, const std::string &value) = 0; - - virtual Message& putUserHeaders(const std::string &key, int value) = 0; - - virtual Message& putUserHeaders(const std::string &key, long value) = 0; - - virtual Message& putUserHeaders(const std::string &key, double value) = 0; - - virtual Message& putUserHeaders(const std::string &key, const std::string &value) = 0; - + virtual KeyValuePtr properties() = 0; + virtual HeadersPtr headers() = 0; }; typedef NS::shared_ptr MessagePtr; diff --git a/MessageFactory.h b/MessageFactory.h index 537e274..64b12c2 100644 --- a/MessageFactory.h +++ b/MessageFactory.h @@ -1,3 +1,19 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #ifndef OMS_MESSAGE_FACTORY_H #define OMS_MESSAGE_FACTORY_H @@ -29,7 +45,7 @@ BEGIN_NAMESPACE_2(io, openmessaging) * @return the created {@code BytesMessage} object * @throws OMSRuntimeException if the OMS provider fails to create this message due to some internal error. */ - virtual ByteMessagePtr createBytesMessage(const std::string &topic, const MessageBody &body) = 0; + virtual ByteMessagePtr createMessage(const std::string &queueName, const MessageBody &body) = 0; }; END_NAMESPACE_2(io, openmessaging) diff --git a/MessagingAccessPoint.h b/MessagingAccessPoint.h index 0c97795..988e326 100644 --- a/MessagingAccessPoint.h +++ b/MessagingAccessPoint.h @@ -1,3 +1,19 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #ifndef OMS_MESSAGING_ACCESS_POINT_H #define OMS_MESSAGING_ACCESS_POINT_H @@ -8,8 +24,7 @@ #include "producer/Producer.h" #include "consumer/PullConsumer.h" #include "consumer/PushConsumer.h" -#include "consumer/StreamingConsumer.h" -#include "ResourceManager.h" +#include "manager/ResourceManager.h" #include "observer/Observer.h" #include "Namespace.h" @@ -45,7 +60,7 @@ BEGIN_NAMESPACE_2(io, openmessaging) * @return the OMS version of implementation * @see OMS#specVersion */ - virtual std::string implVersion() = 0; + virtual std::string version() = 0; /** * Returns the attributes of this {@code MessagingAccessPoint} instance. @@ -71,7 +86,8 @@ BEGIN_NAMESPACE_2(io, openmessaging) * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request * due to some internal error */ - virtual producer::ProducerPtr createProducer(const KeyValuePtr &properties = kv_nullptr) = 0; + virtual producer::ProducerPtr createProducer() = 0; + virtual producer::ProducerPtr createProducer(const TransactionStateCheckListenerPtr &transactionStateCheckListener) = 0; /** * Creates a new {@code PushConsumer} for the specified {@code MessagingAccessPoint}. @@ -82,25 +98,7 @@ BEGIN_NAMESPACE_2(io, openmessaging) * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request * due to some internal error */ - virtual consumer::PushConsumerPtr createPushConsumer(const KeyValuePtr &properties = kv_nullptr) = 0; - - /** - * Creates a new {@code PullConsumer} for the specified {@code MessagingAccessPoint}. - * - * @return the created {@code PullConsumer} - * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request - * due to some internal error - */ - virtual consumer::PullConsumerPtr createPullConsumer(const KeyValuePtr &properties = kv_nullptr) = 0; - - /** - * Creates a new {@code StreamingConsumer} for the specified {@code MessagingAccessPoint}. - * - * @return the created {@code Stream} - * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request - * due to some internal error - */ - virtual consumer::StreamingConsumerPtr createStreamingConsumer(const KeyValuePtr &properties = kv_nullptr) = 0; + virtual consumer::PushConsumerPtr createConsumer() = 0; /** * Gets a lightweight {@code ResourceManager} instance from the specified {@code MessagingAccessPoint}. diff --git a/Namespace.h b/Namespace.h index a3de462..be863e9 100644 --- a/Namespace.h +++ b/Namespace.h @@ -1,3 +1,19 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #ifndef OMS_NAMESPACE_H #define OMS_NAMESPACE_H diff --git a/OMS.h b/OMS.h index 52063be..c70c929 100644 --- a/OMS.h +++ b/OMS.h @@ -1,3 +1,19 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #ifndef OMS_OMS_H #define OMS_OMS_H diff --git a/OMSBuiltinKeys.h b/OMSBuiltinKeys.h new file mode 100644 index 0000000..0240b20 --- /dev/null +++ b/OMSBuiltinKeys.h @@ -0,0 +1,32 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OMS_BUILTIN_KEYS_H +#define OMS_BUILTIN_KEYS_H + +#include +#include "Namespace.h" + +BEGIN_NAMESPACE_2(io, openmessaging) + + static const std::string DRIVER_IMPL = "DRIVER_IMPL"; + static const std::string ACCESS_POINTS = "ACCESS_POINTS"; + static const std::string ACCOUNT_ID = "ACCOUNT_ID"; + static const std::string REGION = "REGION"; + static const std::string OPERATION_TIMEOUT = "OPERATION_TIMEOUT"; + +END_NAMESPACE_2(io, openmessaging) +#endif // OMS_BUILTIN_KEYS_H diff --git a/OMSException.h b/OMSException.h deleted file mode 100644 index 23b9b52..0000000 --- a/OMSException.h +++ /dev/null @@ -1,25 +0,0 @@ -#ifndef OMS_OMS_EXCEPTION_H -#define OMS_OMS_EXCEPTION_H - -#include -#include "Namespace.h" - -BEGIN_NAMESPACE_2(io, openmessaging) - - class OMSException { - public: - OMSException(const std::string &reason = "Unknown Reason") - : _reason(reason) { - - } - - const char *what() const { - return _reason.c_str(); - } - - private: - const std::string _reason; - }; - -END_NAMESPACE_2(io, openmessaging) -#endif //OMS_OMS_EXCEPTION_H diff --git a/OMSResponseStatus.h b/OMSResponseStatus.h new file mode 100644 index 0000000..4aafc64 --- /dev/null +++ b/OMSResponseStatus.h @@ -0,0 +1,53 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OMS_RESPONSE_STATUS_H +#define OMS_RESPONSE_STATUS_H +BEGIN_NAMESPACE_u2(io, openmessaging) +enum OMSResponceStatus{ + UnsupportedVersion = 1101, + Success = 1200, + BadRequest = 1400, + Unauthorized = 1401, + MessageBodyRequired = 1402, + Forbidden = 1403, + DestinationNotFound = 1404, + NamespaceNotFound = 1405, + DestinationAlreadyExists = 1406, + NamespaceAlreadyExists = 1407, + ConsumerIdAlreadyExists = 1408, + ProducerIdAlreadyExists = 1409, + RequestTimeout = 1410, + MessageAttributesTooLarge = 1411, + MessageHeaderTooLarge = 1412, + MessageBodyTooLarge = 1413, + NoNewMessageFound = 1414, + MaxTopicsReached = 1415, + MaxQueuesReached = 1416, + MaxNamespacesReached = 1417, + BadParameter = 1416, + ServerStatus = 1500, + StorageServiceStatus = 1501, + StorageServiceBusy = 1502, + ServiceNotAvailable = 1503, + FlushDiskTimeout = 1504, + CreateMessagingAccessPointFailed = 10000, + IllegalDriver = 10001, + IllegalVersion = 10002, + VersionNotCompatibleSpecification = 10003, +}; +END_NAMESPACE_2(io, openmessaging) +#endif //OMS_RESPONSE_STATUS_H diff --git a/OpenMessaging.h b/OpenMessaging.h index a01ea4c..c26604e 100644 --- a/OpenMessaging.h +++ b/OpenMessaging.h @@ -1,8 +1,24 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #ifndef OMS_OPEN_MESSAGING_H #define OMS_OPEN_MESSAGING_H #include "OMS.h" #include "MessagingAccessPoint.h" -#include "BuiltinKeys.h" +#include "OMSBuiltinKeys.h" #endif //OMS_OPEN_MESSAGING_H diff --git a/Promise.h b/Promise.h index 55cbea5..6f6dbf1 100644 --- a/Promise.h +++ b/Promise.h @@ -1,3 +1,19 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #ifndef OMS_PROMISE_H #define OMS_PROMISE_H diff --git a/README.md b/README.md index 26b93c4..ff3aa39 100644 --- a/README.md +++ b/README.md @@ -2,16 +2,8 @@ [![Gitter chat](https://badges.gitter.im/gitterHQ/gitter.png)](https://gitter.im/openmessaging/public) [![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html) -### A vendor-neutral open standard for distributed messaging and streaming -OpenMessaging, which includes the establishment of industry guidelines and messaging, streaming specifications to provide a common framework for finance, e-commerce, IoT and big-data area. The design principles are the cloud-oriented, simplicity, flexibility, and language independent in distributed heterogeneous environments. Conformance to these specifications will make it possible to develop a heterogeneous messaging applications across all major platforms and operating systems. - -## The domain architecture -![domain-design](assets/images/OpenMessaging-V0.3.0-alpha.png) - -## Introduction -This repository contains Open Messaging Specification APIs in C++. An example implementation can be found [here](https://github.com/openmessaging/openmessaging-client-sdk-cpp). - C++ APIs are designed to be consistent with [Java APIs](https://openmessaging.github.io/openmessaging-java/) in terms of semantics. +Version: See [Java APIs](https://openmessaging.github.io/openmessaging-java/). ## ![Powered by Linux Foundation](http://openmessaging.cloud/images/linux-foundation-logo.png) \ No newline at end of file diff --git a/ResourceManager.h b/ResourceManager.h deleted file mode 100644 index 525cd20..0000000 --- a/ResourceManager.h +++ /dev/null @@ -1,205 +0,0 @@ -#ifndef OMS_RESOURCE_MANAGER_H -#define OMS_RESOURCE_MANAGER_H - -#include - -#include "smart_pointer.h" -#include "ServiceLifecycle.h" -#include "KeyValue.h" -#include "routing/Routing.h" -#include "Namespace.h" - -BEGIN_NAMESPACE_2(io, openmessaging) - - /** - * The {@code ResourceManager} is to provide a unified interface of resource management, - * allowing developers to manage the namespace, queue and routing resources. - *

- * Create, set, get and delete are the four basic operations of {@code ResourceManager}. - *

- * {@code ResourceManager} also supports fetching and updating resource attributes dynamically. - *

- * {@link MessagingAccessPoint#resourceManager()} ()} is the unique method to obtain a {@code ResourceManager} - * instance. Changes made through this instance will immediately apply to the message-oriented middleware (MOM) behind - * {@code MessagingAccessPoint}. - *

- * All operations conducted via this instance are confined to the configured namespace, - * with default namespace derived from the OMS driver url of {@code MessagingAccessPoint}.Change namespace - * by {@link ResourceManager#switchNamespace(String)} whenever necessary. - * - * @version OMS 1.0.0 - * @since OMS 1.0.0 - */ - class ResourceManager : public virtual Uncopyable { - public: - virtual ~ResourceManager() { - - } - - /** - * Creates a {@code Namespace} resource with some preset attributes. - * - * @param nsName the name of the new namespace - * @param attributes the preset attributes - */ - virtual void createNamespace(const std::string &ns, KeyValuePtr &attributes) = 0; - - /** - * Sets the attributes of the configured namespace, the old attributes will be replaced - * by the provided attributes, only the provided key will be updated. - * - * @param attributes the new attributes - * @throws OMSResourceNotExistException if the configured namespace does not exist - */ - virtual void setNamespaceAttributes(const KeyValuePtr &attributes) = 0; - - /** - * Gets the attributes of the configured namespace. - * - * @return the attributes of namespace - * @throws OMSResourceNotExistException if the configured namespace does not exist - */ - virtual KeyValuePtr getNamespaceAttributes() = 0; - - /** - * Deletes an existing namespace resource. - * - * @param ns the namespace to delete - * @throws OMSResourceNotExistException if the specified namespace does not exist - */ - virtual void deleteNamespace(const std::string &ns) = 0; - - /** - * Gets the namespace list in the current {@code MessagingAccessPoint}. - * - * @return the list of all namespaces - */ - virtual std::vector listNamespaces() = 0; - - /** - * Switches the default namespace to the new one, and all the operations will reflect to - * the new namespace after the method returns successfully. - * - * @param ns the target namespace to switch to - * @throws OMSResourceNotExistException if the new namespace does not exist - */ - virtual void switchNamespace(const std::string &ns) = 0; - - /** - * Creates a {@code Queue} resource in the configured namespace with some preset attributes. - * - * @param queueName the name of the new queue - * @param attributes the preset attributes - * @throws OMSResourceNotExistException if the specified queue name is duplicated - */ - virtual void createQueue(const std::string &ns, const std::string &queue, KeyValuePtr attributes) = 0; - - /** - * Sets the attributes of the specified queue, the old attributes will be replaced - * by the provided attributes, only the provided key will be updated. - * - * @param queueName the queue name - * @param attributes the new attributes - * @throws OMSResourceNotExistException if the specified queue does not exist - */ - virtual void setQueueAttributes(const std::string &ns, const std::string &queue, KeyValuePtr attributes) = 0; - - /** - * Gets the attributes of the specified queue. - * - * @param queueName the queue name - * @return the attributes of namespace - * @throws OMSResourceNotExistException if the specified queue or namespace does not exist - */ - virtual KeyValuePtr getQueueAttributes(const std::string &ns, const std::string &queue) = 0; - - /** - * Deletes an existing queue resource. - * - * @param queueName the queue needs to be deleted - * @throws OMSResourceNotExistException if the specified queue or namespace does not exist - */ - virtual void deleteQueue(const std::string &queue) = 0; - - /** - * Gets the queue list in the configured namespace. - * - * @return the list of all queues - */ - virtual std::vector listQueues() = 0; - - /** - * Creates a {@code Routing} resource in the configured namespace with some preset attributes. - * - * @param routingName the name of the new routing - * @param attributes the preset attributes - * @throws OMSResourceNotExistException if the configured namespace is not exists - */ - virtual void createRouting(const std::string &routingName, const KeyValuePtr &attributes) = 0; - - /** - * Sets the attributes of the specified routing, the old attributes will be replaced - * by the provided attributes, only the provided key will be updated. - * - * @param routingName the routing name - * @param attributes the new attributes - * @throws OMSResourceNotExistException if the specified routing or namespace is not exists - */ - virtual void setRoutingAttributes(const std::string &routingName, const KeyValuePtr &attributes) = 0; - - /** - * Gets the attributes of the specified routing. - * - * @param routingName the routing name - * @return the attributes of routing - * @throws OMSResourceNotExistException if the specified routing or namespace is not exists - */ - virtual KeyValuePtr getRoutingAttributes(const std::string &queueName) = 0; - - /** - * Deletes an existing routing resource. - * - * @param routingName the routing needs to be deleted - * @throws OMSResourceNotExistException if the specified routing or namespace is not exists - */ - virtual void deleteRouting(const std::string &ns, const std::string &route) = 0; - - /** - * Gets the routing list in the configured namespace. - * - * @return the list of all routings - * @throws OMSResourceNotExistException if the configured namespace is not exists - */ - virtual std::vector listRoutings(const std::string &ns) = 0; - - /** - * Gets the stream list behind the specified queue. - * - * @param queueName the queue name - * @return the list of all streams - */ - virtual std::vector listStreams(const std::string &queueName) = 0; - - /** - * Updates some system headers of a message in the configured namespace. - *

- * Below system headers are allowed to be changed dynamically: - *

    - *
  • {@link Message.BuiltinKeys#START_TIME}
  • - *
  • {@link Message.BuiltinKeys#STOP_TIME}
  • - *
  • {@link Message.BuiltinKeys#TIMEOUT}
  • - *
  • {@link Message.BuiltinKeys#PRIORITY}
  • - *
  • {@link Message.BuiltinKeys#SCHEDULE_EXPRESSION}
  • - *
- * - * @param messageId the id of message - * @param headers the new headers - */ - virtual void updateMessage(const std::string &messageId, KeyValuePtr &headers) = 0; - }; - - typedef NS::shared_ptr ResourceManagerPtr; - -END_NAMESPACE_2(io, openmessaging) - -#endif // OMS_RESOURCE_MANAGER_H diff --git a/ServiceLifecycle.h b/ServiceLifecycle.h index 840106c..bec9452 100644 --- a/ServiceLifecycle.h +++ b/ServiceLifecycle.h @@ -1,3 +1,19 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #ifndef OMS_SERVICE_LIFECYCLE_H #define OMS_SERVICE_LIFECYCLE_H diff --git a/Uncopyable.h b/Uncopyable.h index 4ac3e2c..57cc64a 100644 --- a/Uncopyable.h +++ b/Uncopyable.h @@ -1,3 +1,19 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #ifndef OMS_UNCOPYABLE_H #define OMS_UNCOPYABLE_H diff --git a/assets/images/OpenMessaging-V0.2.0-alpha.png b/assets/images/OpenMessaging-V0.2.0-alpha.png deleted file mode 100644 index c82b033..0000000 Binary files a/assets/images/OpenMessaging-V0.2.0-alpha.png and /dev/null differ diff --git a/assets/images/OpenMessaging-V0.3.0-alpha.png b/assets/images/OpenMessaging-V0.3.0-alpha.png deleted file mode 100644 index 368cf4f..0000000 Binary files a/assets/images/OpenMessaging-V0.3.0-alpha.png and /dev/null differ diff --git a/assets/images/domain-design.png b/assets/images/domain-design.png deleted file mode 100644 index 19190e5..0000000 Binary files a/assets/images/domain-design.png and /dev/null differ diff --git a/consumer/Consumer.h b/consumer/Consumer.h new file mode 100644 index 0000000..ae5cca6 --- /dev/null +++ b/consumer/Consumer.h @@ -0,0 +1,138 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OMS_CONSUMER_H +#define OMS_CONSUMER_H + +#include + +#include "smart_pointer.h" +#include "ServiceLifecycle.h" +#include "MessageListener.h" +#include "Namespace.h" +#include "OMS.h" +#include "interceptor/ConsumerInterceptor.h" + +BEGIN_NAMESPACE_3(io, openmessaging, consumer) + /** + * A {@code Consumer} object to receive messages from multiple queues, these messages are pushed from + * MOM server to {@code Consumer} client. + * + * @version OMS 1.0 + * @see MessagingAccessPoint#createConsumer() + * @since OMS 1.0 + */ + class Consumer : public virtual ServiceLifecycle { + public: + virtual ~Consumer() { + + } + + + /** + * Resumes the {@code Consumer} after a suspend. + *

+ * This method resumes the {@code Consumer} instance after it was suspended. + * The instance will not receive new messages between the suspend and resume calls. + * + * @see Consumer#suspend() + */ + virtual void resume() = 0; + + /** + * Suspends the {@code Consumer} for later resumption. + *

+ * This method suspends the consumer until it is resumed. + * The consumer will not receive new messages between the suspend and resume calls. + *

+ * This method behaves exactly as if it simply performs the call {@code suspend(0)}. + * + * @see Consumer#resume() + */ + virtual void suspend(long timeout = 0) = 0; + + /** + * This method is used to find out whether the {@code Consumer} is suspended. + * + * @return true if this {@code Consumer} is suspended, false otherwise + */ + virtual bool isSuspended() = 0; + + /** + * Binds the {@code Consumer} to a specified queue, with a {@code MessageListener}. + *

+ * {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new + * delivered message is coming. + * + * @param queueName a specified queue + * @param listener a specified listener to receive new message + * @return this {@code Consumer} instance + */ + virtual Consumer &bindQueue(const std::string &queueName, + const MessageListenerPtr &listener) = 0; + + /** + * Unbinds the {@code Consumer} from a specified queue. + *

+ * After the success call, this consumer won't receive new message + * from the specified queue any more. + * + * @param queueName a specified queue + * @return this {@code Consumer} instance + */ + virtual Consumer &unbindQueue(const std::string &queueName) = 0; + + /** + * Adds a {@code ConsumerInterceptor} instance to this consumer. + * + * @param interceptor an interceptor instance + */ + virtual void addInterceptor(const interceptor::ConsumerInterceptorPtr &interceptor) = 0; + + /** + * Removes an interceptor from this consumer. + * + * @param interceptor an interceptor to be removed + */ + virtual void removeInterceptor(const interceptor::ConsumerInterceptorPtr &interceptor) = 0; + + /** + * Receives the next message from the bind queues of this consumer in pull model. + *

+ * This call blocks indefinitely until a message is arrives, the timeout expires, or until this {@code PullConsumer} + * is shut down. + * + * @param timeout receive message will blocked at most timeout milliseconds. + * @return the next message received from the bind queues, or null if the consumer is concurrently shut down. + */ + virtual MessagePtr &receive(long timeout) = 0; + + /** + * Acknowledges the specified and consumed message with the unique message receipt handle, in the scenario of using + * manual commit. + *

+ * Messages that have been received but not acknowledged may be redelivered. + * + * @param receiptHandle the receipt handle associated with the consumed message. + */ + virtual void ack(String receiptHandle) = 0; + }; + + typedef NS::shared_ptr ConsumerPtr; + +END_NAMESPACE_3(io, openmessaging, consumer) + +#endif //OMS_CONSUMER_H diff --git a/consumer/Context.h b/consumer/Context.h index bbf56e8..c50dc22 100644 --- a/consumer/Context.h +++ b/consumer/Context.h @@ -1,3 +1,19 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #ifndef OMS_CONTEXT_H #define OMS_CONTEXT_H @@ -12,8 +28,11 @@ BEGIN_NAMESPACE_3(io, openmessaging, consumer) virtual ~Context() { } - virtual KeyValuePtr attributes() = 0; - + /** + * Acknowledges the specified and consumed message, which is related to this {@code Context}. + *

+ * Messages that have been received but not acknowledged may be redelivered. + */ virtual void ack() = 0; }; diff --git a/consumer/MessageListener.h b/consumer/MessageListener.h index 231fcc7..acc3258 100644 --- a/consumer/MessageListener.h +++ b/consumer/MessageListener.h @@ -1,3 +1,19 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #ifndef OMS_MESSAGE_LISTENER_H #define OMS_MESSAGE_LISTENER_H @@ -22,7 +38,15 @@ BEGIN_NAMESPACE_3(io, openmessaging, consumer) } - virtual void onMessage(MessagePtr &message, ContextPtr &context) = 0; + /** + * Callback method to receive incoming messages. + *

+ * A message listener should handle different types of {@code Message}. + * + * @param message the received message object. + * @param context the context delivered to the consume thread. + */ + virtual void onReceived(MessagePtr &message, ContextPtr &context) = 0; }; typedef NS::shared_ptr MessageListenerPtr; diff --git a/consumer/PullConsumer.h b/consumer/PullConsumer.h deleted file mode 100644 index acfcb8e..0000000 --- a/consumer/PullConsumer.h +++ /dev/null @@ -1,91 +0,0 @@ -#ifndef OMS_PULL_CONSUMER_H -#define OMS_PULL_CONSUMER_H - -#include - -#include "smart_pointer.h" -#include "KeyValue.h" -#include "ServiceLifecycle.h" -#include "Namespace.h" -#include "Message.h" -#include "OMS.h" - -BEGIN_NAMESPACE_3(io, openmessaging, consumer) - - /** - * A {@code PullConsumer} object can poll messages from the specified queue, - * and supports submit the consume result by acknowledgement. - * - * @version OMS 1.0 - * @see MessagingAccessPoint#createPullConsumer(String) - * @since OMS 1.0 - */ - class PullConsumer : public virtual ServiceLifecycle { - public: - virtual ~PullConsumer() { - - } - - /** - * Returns the attributes of this {@code PullConsumer} instance. - * Changes to the return {@code KeyValue} are not reflected in physical {@code PullConsumer}. - *

- * There are some standard attributes defined by OMS for {@code PullConsumer}: - *

    - *
  • {@link OMSBuiltinKeys#CONSUMER_ID}, the unique consumer id for a consumer instance. - *
  • {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code PullConsumer}. - *
- * - * @return the attributes - */ - virtual KeyValuePtr attributes() = 0; - - /** - * Attaches the {@code PullConsumer} to a specified queue. - * - * @param queueName a specified queue - * @return this {@code PullConsumer} instance - */ - virtual PullConsumer& attachQueue(const std::string &queueName, const KeyValuePtr &properties = kv_nullptr) = 0; - - /** - * Detaches the {@code PullConsumer} from a specified queue. - *

- * After the success call, this consumer won't receive new message - * from the specified queue any more. - * - * @param queueName a specified queue - * @return this {@code PullConsumer} instance - */ - virtual PullConsumer& detachQueue(const std::string &queueName) = 0; - - /** - * Receives the next message from the attached queues of this consumer. - *

- * This call blocks indefinitely until a message is arrives, the timeout expires, - * or until this {@code PullConsumer} is shut down. - * - * @return the next message received from the attached queues, or null if the consumer is - * concurrently shut down or the timeout expires - * @throws OMSRuntimeException if the consumer fails to pull the next message due to some internal error. - */ - virtual MessagePtr receive(const KeyValuePtr &props = kv_nullptr) = 0; - - /** - * Acknowledges the specified and consumed message with the specified attributes. - *

- * Messages that have been received but not acknowledged may be redelivered. - * - * @param messageId the consumed message id - * @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error. - */ - virtual void ack(const std::string &messageId, - const KeyValuePtr &props = kv_nullptr) = 0; - }; - - typedef NS::shared_ptr PullConsumerPtr; - - -END_NAMESPACE_3(io, openmessaging, consumer) - -#endif //OMS_PULL_CONSUMER_H diff --git a/consumer/PushConsumer.h b/consumer/PushConsumer.h deleted file mode 100644 index 30ebdeb..0000000 --- a/consumer/PushConsumer.h +++ /dev/null @@ -1,117 +0,0 @@ -#ifndef OMS_PUSH_CONSUMER_H -#define OMS_PUSH_CONSUMER_H - -#include - -#include "smart_pointer.h" -#include "ServiceLifecycle.h" -#include "MessageListener.h" -#include "Namespace.h" -#include "OMS.h" -#include "interceptor/ConsumerInterceptor.h" - -BEGIN_NAMESPACE_3(io, openmessaging, consumer) - /** - * A {@code PushConsumer} object to receive messages from multiple queues, these messages are pushed from - * MOM server to {@code PushConsumer} client. - * - * @version OMS 1.0 - * @see MessagingAccessPoint#createPushConsumer() - * @since OMS 1.0 - */ - class PushConsumer : public virtual ServiceLifecycle { - public: - virtual ~PushConsumer() { - - } - - /** - * Returns the attributes of this {@code PushConsumer} instance. - * Changes to the return {@code KeyValue} are not reflected in physical {@code PushConsumer}. - *

- * There are some standard attributes defined by OMS for {@code PushConsumer}: - *

    - *
  • {@link OMSBuiltinKeys#CONSUMER_ID}, the unique consumer id for a consumer instance. - *
  • {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code PushConsumer}. - *
- * - * @return the attributes - */ - virtual KeyValuePtr attributes() = 0; - - /** - * Resumes the {@code PushConsumer} after a suspend. - *

- * This method resumes the {@code PushConsumer} instance after it was suspended. - * The instance will not receive new messages between the suspend and resume calls. - * - * @throws OMSRuntimeException if the instance has not been suspended. - * @see PushConsumer#suspend() - */ - virtual void resume() = 0; - - /** - * Suspends the {@code PushConsumer} for later resumption. - *

- * This method suspends the consumer until it is resumed. - * The consumer will not receive new messages between the suspend and resume calls. - *

- * This method behaves exactly as if it simply performs the call {@code suspend(0)}. - * - * @throws OMSRuntimeException if the instance is not currently running. - * @see PushConsumer#resume() - */ - virtual void suspend(long timeout = 0) = 0; - - /** - * This method is used to find out whether the {@code PushConsumer} is suspended. - * - * @return true if this {@code PushConsumer} is suspended, false otherwise - */ - virtual bool isSuspended() = 0; - - /** - * Attaches the {@code PushConsumer} to a specified queue, with a {@code MessageListener}. - *

- * {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new - * delivered message is coming. - * - * @param queueName a specified queue - * @param listener a specified listener to receive new message - * @return this {@code PushConsumer} instance - */ - virtual PushConsumer &attachQueue(const std::string &queueName, - const MessageListenerPtr &listener, - const KeyValuePtr &properties = kv_nullptr) = 0; - - /** - * Detaches the {@code PushConsumer} from a specified queue. - *

- * After the success call, this consumer won't receive new message - * from the specified queue any more. - * - * @param queueName a specified queue - * @return this {@code PushConsumer} instance - */ - virtual PushConsumer &detachQueue(const std::string &queueName) = 0; - - /** - * Adds a {@code PushConsumerInterceptor} instance to this consumer. - * - * @param interceptor an interceptor instance - */ - virtual void addInterceptor(const interceptor::ConsumerInterceptorPtr &interceptor) = 0; - - /** - * Removes an interceptor from this consumer. - * - * @param interceptor an interceptor to be removed - */ - virtual void removeInterceptor(const interceptor::ConsumerInterceptorPtr &interceptor) = 0; - }; - - typedef NS::shared_ptr PushConsumerPtr; - -END_NAMESPACE_3(io, openmessaging, consumer) - -#endif //OMS_PUSH_CONSUMER_H diff --git a/consumer/StreamingConsumer.h b/consumer/StreamingConsumer.h deleted file mode 100644 index f2cf573..0000000 --- a/consumer/StreamingConsumer.h +++ /dev/null @@ -1,39 +0,0 @@ -#ifndef OMS_STREAM_CONSUMER_H -#define OMS_STREAM_CONSUMER_H - -#include -#include - -#include "smart_pointer.h" -#include "KeyValue.h" -#include "ServiceLifecycle.h" -#include "StreamingIterator.h" -#include "Namespace.h" - -BEGIN_NAMESPACE_3(io, openmessaging, consumer) - /** - * A {@code Queue} is divided by many streams. - *

- * A {@code StreamingConsumer} object supports consume messages from a - * specified partition like a iterator. - * - * @version OMS 1.0 - * @see Stream - * @since OMS 1.0 - */ - class StreamingConsumer : public virtual ServiceLifecycle { - public: - virtual ~StreamingConsumer() { - - } - - virtual KeyValuePtr attributes() = 0; - - virtual StreamingIteratorPtr seek(const std::string &name, long offset, int whence) = 0; - - }; - typedef NS::shared_ptr StreamingConsumerPtr; - -END_NAMESPACE_3(io, openmessaging, consumer) - -#endif //OMS_STREAM_CONSUMER_H diff --git a/consumer/StreamingIterator.h b/consumer/StreamingIterator.h deleted file mode 100644 index 020b33c..0000000 --- a/consumer/StreamingIterator.h +++ /dev/null @@ -1,35 +0,0 @@ -#ifndef OMS_STREAMING_ITERATOR_H -#define OMS_STREAMING_ITERATOR_H - - -#include "smart_pointer.h" -#include "ServiceLifecycle.h" -#include "Namespace.h" -#include "Message.h" - -BEGIN_NAMESPACE_3(io, openmessaging, consumer) - - class StreamingIterator : public virtual ServiceLifecycle { - public: - virtual ~StreamingIterator() { - - } - - virtual KeyValuePtr attributes() = 0; - - virtual void commit(bool flush) = 0; - - virtual bool hasNext() = 0; - - virtual MessagePtr next() = 0; - - virtual bool hasPrevious() = 0; - - virtual MessagePtr previous() = 0; - }; - - typedef NS::shared_ptr StreamingIteratorPtr; - -END_NAMESPACE_3(io, openmessaging, consumer) - -#endif //OMS_STREAMING_ITERATOR_H diff --git a/interceptor/ConsumerInterceptor.h b/interceptor/ConsumerInterceptor.h index d1cda89..1f7d13a 100644 --- a/interceptor/ConsumerInterceptor.h +++ b/interceptor/ConsumerInterceptor.h @@ -1,3 +1,19 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #ifndef OMS_CONSUMER_INTERCEPTOR_H #define OMS_CONSUMER_INTERCEPTOR_H @@ -9,7 +25,7 @@ BEGIN_NAMESPACE_3(io, openmessaging, interceptor) /** - * A {@code PushConsumerInterceptor} is used to intercept consume operations of push consumer. + * A {@code ConsumerInterceptor} is used to intercept consume operations of consumer. * * @version OMS 1.0 * @since OMS 1.0 @@ -21,10 +37,21 @@ BEGIN_NAMESPACE_3(io, openmessaging, interceptor) } virtual std::string name() const = 0; - - virtual void preReceive(const MessagePtr &message, const KeyValuePtr &attributes = kv_nullptr) = 0; - - virtual void postReceive(const MessagePtr &message, const KeyValuePtr &attributes = kv_nullptr) = 0; + /** + * Invoked before the invocation of {@link MessageListener#onReceived(Message, Context)}. + * + * @param message the message is actually received. + * @param attributes the extensible attributes delivered to the intercept thread. + */ + virtual void preReceive(const MessagePtr &message, const InterceptorContextPtr &attributes = kv_nullptr) = 0; + + /** + * Invoked after the invocation of {@link MessageListener#onReceived(Message, MessageListener.Context)}. + * + * @param message the message is actually received. + * @param attributes the extensible attributes delivered to the intercept thread. + */ + virtual void postReceive(const MessagePtr &message, const InterceptorContextPtr &attributes = kv_nullptr) = 0; }; typedef NS::shared_ptr ConsumerInterceptorPtr; diff --git a/interceptor/InterceptorContext.h b/interceptor/InterceptorContext.h new file mode 100644 index 0000000..a5da8fb --- /dev/null +++ b/interceptor/InterceptorContext.h @@ -0,0 +1,42 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OMS_INTERCEPTOR_CONTEXT_H +#define OMS_INTERCEPTOR_CONTEXT_H + +#include "Namespace.h" +#include "OMS.h" +#include "Uncopyable.h" + +BEGIN_NAMESPACE_3(io, openmessaging, interceptor) + + class InterceptorContext : private Uncopyable { + public: + virtual ~InterceptorContext() { + } + /** + * Returns the attributes of this {@code InterceptorContext} instance. + * + * @return the attributes. + */ + virtual KeyValuePtr attributes() = 0; + + }; + + typedef NS::shared_ptr InterceptorContextPtr; + +END_NAMESPACE_3(io, openmessaging, interceptor) +#endif //OMS_INTERCEPTOR_CONTEXT_H diff --git a/interceptor/ProducerInterceptor.h b/interceptor/ProducerInterceptor.h index f30cdfe..41058e4 100644 --- a/interceptor/ProducerInterceptor.h +++ b/interceptor/ProducerInterceptor.h @@ -1,3 +1,19 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #ifndef OMS_PRODUCER_INTERCEPTOR_H #define OMS_PRODUCER_INTERCEPTOR_H @@ -22,9 +38,23 @@ BEGIN_NAMESPACE_3(io, openmessaging, interceptor) virtual std::string name() const = 0; - virtual void preSend(const MessagePtr &message, const KeyValuePtr &attributes = kv_nullptr) = 0; + /** + * Invoked before the message is actually sent to the network. + *

+ * This allows for modification of the message if necessary. + * + * @param message a message will be sent. + * @param attributes the extensible attributes delivered to the intercept thread. + */ + virtual void preSend(const MessagePtr &message, const InterceptorContextPtr &context = kv_nullptr) = 0; - virtual void postSend(const MessagePtr &message, const KeyValuePtr &attributes = kv_nullptr) = 0; + /** + * Invoked immediately after the successful send invocation. + * + * @param message the message is actually sent. + * @param attributes the extensible attributes delivered to the intercept thread. + */ + virtual void postSend(const MessagePtr &message, const InterceptorContextPtr &context = kv_nullptr) = 0; }; typedef NS::shared_ptr ProducerInterceptorPtr; diff --git a/manager/ResourceManager.h b/manager/ResourceManager.h new file mode 100644 index 0000000..8f42cbf --- /dev/null +++ b/manager/ResourceManager.h @@ -0,0 +1,130 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OMS_RESOURCE_MANAGER_H +#define OMS_RESOURCE_MANAGER_H + +#include + +#include "../smart_pointer.h" +#include "../ServiceLifecycle.h" +#include "../KeyValue.h" +#include "../routing/Routing.h" +#include "../Namespace.h" + +BEGIN_NAMESPACE_3(io, openmessaging, manager) + + /** + * The {@code ResourceManager} is to provide a unified interface of resource management, + * allowing developers to manage the namespace, queue and routing resources. + *

+ * Create, set, get and delete are the four basic operations of {@code ResourceManager}. + *

+ * {@code ResourceManager} also supports fetching and updating resource attributes dynamically. + *

+ * {@link MessagingAccessPoint#resourceManager()} ()} is the unique method to obtain a {@code ResourceManager} + * instance. Changes made through this instance will immediately apply to the message-oriented middleware (MOM) behind + * {@code MessagingAccessPoint}. + *

+ * All operations conducted via this instance are confined to the configured namespace, + * with default namespace derived from the OMS driver url of {@code MessagingAccessPoint}.Change namespace + * by {@link ResourceManager#switchNamespace(String)} whenever necessary. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ + class ResourceManager : public virtual Uncopyable { + public: + virtual ~ResourceManager() { + + } + + /** + * Creates a {@code Namespace} resource with some preset attributes. + * + * @param ns the name of the new namespace + */ + virtual void createNamespace(const std::string &ns) = 0; + + /** + * Deletes an existing namespace resource. + * + * @param ns the namespace to delete + */ + virtual void deleteNamespace(const std::string &ns) = 0; + + /** + * Gets the namespace list in the current {@code MessagingAccessPoint}. + * + * @return the list of all namespaces + */ + virtual std::vector listNamespaces() = 0; + + /** + * Switches the default namespace to the new one, and all the operations will reflect to + * the new namespace after the method returns successfully. + * + * @param ns the target namespace to switch to + */ + virtual void switchNamespace(const std::string &ns) = 0; + + /** + * Creates a {@code Queue} resource in the configured namespace with some preset attributes. + * + * @param queueName the name of the new queue + * @param attributes the preset attributes + */ + virtual void createQueue(const std::string &ns, const std::string &queue, KeyValuePtr attributes) = 0; + + /** + * Deletes an existing queue resource. + * + * @param queueName the queue needs to be deleted + */ + virtual void deleteQueue(const std::string &queue) = 0; + + /** + * Gets the queue list in the configured namespace. + * + * @return the list of all queues + */ + virtual std::vector listQueues() = 0; + + /** + * Routing from sourceQueue to targetQueue. Both queues are could be received messages after creating route action. + * + * @param sourceQueue source queue, process messages received from producer and duplicate those to target queue. + * @param targetQueue receive messages from source queue. + */ + virtual void routing(const std::string &sourceQueue, const std::string &targetQueue) = 0; + + /** + * In order to enable consumers to get the message in the specified mode, the filter rule follows the sql standard + * to filter out messages. + * + * @param queueName queue name. + * @param filterString SQL expression to filter out messages. + */ + virtual void filter(const std::string &queue, const std::string &filter) = 0; + + + }; + + typedef NS::shared_ptr ResourceManagerPtr; + +END_NAMESPACE_3(io, openmessaging, manager) + +#endif // OMS_RESOURCE_MANAGER_H diff --git a/observer/OMSEvent.h b/observer/OMSEvent.h deleted file mode 100644 index 71c1d81..0000000 --- a/observer/OMSEvent.h +++ /dev/null @@ -1,29 +0,0 @@ -#ifndef OMS_OMS_EVENT_H -#define OMS_OMS_EVENT_H - -#include "Namespace.h" -#include "Uncopyable.h" - -BEGIN_NAMESPACE_3(io, openmessaging, observer) - - /** - * Observable objects can trigger a {@code OMSEvent}, which will be handled in {@link Observer#onEvent(OMSEvent)} - * - * @version OMS 1.0 - * @since OMS 1.0 - */ - class OMSEvent : private Uncopyable { - public: - virtual ~OMSEvent() { - - } - - virtual std::string type() = 0; - - virtual std::string details() = 0; - }; - - typedef NS::shared_ptr OMSEventPtr; - -END_NAMESPACE_3(io, openmessaging, observer) -#endif //OMS_OMS_EVENT_H diff --git a/observer/OMSExceptionEvent.h b/observer/OMSExceptionEvent.h deleted file mode 100644 index 4850e65..0000000 --- a/observer/OMSExceptionEvent.h +++ /dev/null @@ -1,22 +0,0 @@ -#ifndef OMS_OMS_EXCEPTION_EVENT_H -#define OMS_OMS_EXCEPTION_EVENT_H - -#include "OMSEvent.h" -#include "Namespace.h" -BEGIN_NAMESPACE_3(io, openmessaging, observer) - - /** - * Special {@link OMSEvent} which is abnormal. - * - * @version OMS 1.0 - * @since OMS 1.0 - */ - class OMSExceptionEvent : public virtual OMSEvent { - public: - virtual ~OMSExceptionEvent() { - - } - }; - -END_NAMESPACE_3(io, openmessaging, observer) -#endif //OMS_OMS_EXCEPTION_EVENT_H diff --git a/observer/Observer.h b/observer/Observer.h deleted file mode 100644 index e21749f..0000000 --- a/observer/Observer.h +++ /dev/null @@ -1,31 +0,0 @@ -#ifndef OMS_OBSERVER_H -#define OMS_OBSERVER_H - -#include "smart_pointer.h" -#include "OMSEvent.h" -#include "Namespace.h" -#include "Uncopyable.h" - -BEGIN_NAMESPACE_3(io, openmessaging, observer) - - /** - * A {@code Observer} interface is used to observe the {@code OMSEvent} - * dispatches in observable objects, like {@link MessagingAccessPoint} - * - * @version OMS 1.0 - * @since OMS 1.0 - */ - class Observer : private Uncopyable { - public: - virtual ~Observer() { - - } - - virtual void onEvent(const OMSEvent &event) = 0; - }; - - typedef NS::shared_ptr ObserverPtr; - -END_NAMESPACE_3(io, openmessaging, observer) - -#endif //OMS_OBSERVER_H diff --git a/producer/BatchMessageSender.h b/producer/BatchMessageSender.h deleted file mode 100644 index 139ec83..0000000 --- a/producer/BatchMessageSender.h +++ /dev/null @@ -1,35 +0,0 @@ -#ifndef OMS_BATCHMESSAGESENDER_H -#define OMS_BATCHMESSAGESENDER_H - -#include "Namespace.h" -#include "Message.h" -#include "Uncopyable.h" - -BEGIN_NAMESPACE_3(io, openmessaging, producer) - /** - * A message sender created through {@link Producer#createSequenceBatchMessageSender()}, to send - * messages in batch way, and commit or roll back at the appropriate time. - * - * @version OMS 1.0 - * @since OMS 1.0 - */ - class BatchMessageSender : private Uncopyable { - public: - virtual ~BatchMessageSender() { - - } - - virtual BatchMessageSender& send(MessagePtr &message) = 0; - - virtual void commit() = 0; - - virtual void rollback() = 0; - - virtual void close() = 0; - }; - - typedef NS::shared_ptr BatchMessageSenderPtr; - -END_NAMESPACE_3(io, openmessaging, producer) - -#endif //OMS_BATCHMESSAGESENDER_H diff --git a/producer/CheckContext.h b/producer/CheckContext.h index 17fbde6..5676180 100644 --- a/producer/CheckContext.h +++ b/producer/CheckContext.h @@ -1,3 +1,19 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #ifndef OMS_CHECK_CONTEXT_H #define OMS_CHECK_CONTEXT_H @@ -11,10 +27,18 @@ BEGIN_NAMESPACE_3(io, openmessaging, producer) virtual ~CheckContext() { } - + /** + * Commits a transaction. + */ virtual void commit() = 0; - + /** + * Rolls back a transaction. + */ virtual void rollback() = 0; + /** + * Unknown transaction status, may be this transaction still on going. + */ + virtual void unknown() = 0; }; diff --git a/producer/ExecutionContext.h b/producer/ExecutionContext.h deleted file mode 100644 index 02d344b..0000000 --- a/producer/ExecutionContext.h +++ /dev/null @@ -1,25 +0,0 @@ -#ifndef OMS_EXECUTION_CONTEXT_H -#define OMS_EXECUTION_CONTEXT_H - -#include "Namespace.h" -#include "Uncopyable.h" - -BEGIN_NAMESPACE_3(io, openmessaging, producer) - - class ExecutionContext : private Uncopyable { - public: - virtual ~ExecutionContext() { - - } - - virtual void commit() = 0; - - virtual void rollback() = 0; - - }; - - typedef NS::shared_ptr ExecutionContextPtr; - -END_NAMESPACE_3(io, openmessaging, producer) - -#endif //OMS_EXECUTION_CONTEXT_H diff --git a/producer/LocalTransactionExecutor.h b/producer/LocalTransactionExecutor.h deleted file mode 100644 index 67afac0..0000000 --- a/producer/LocalTransactionExecutor.h +++ /dev/null @@ -1,43 +0,0 @@ -#ifndef OMS_LOCAL_TRANSACTION_EXECUTOR_H -#define OMS_LOCAL_TRANSACTION_EXECUTOR_H - -#include "Namespace.h" -#include "Message.h" -#include "producer/ExecutionContext.h" -#include "producer/CheckContext.h" -#include "Uncopyable.h" - -BEGIN_NAMESPACE_3(io, openmessaging, producer) - - /** - * Each executor will be associated with a transactional message, can be used to execute local transaction - * branch and submit the transaction status(commit or rollback). - *

- * - * The associated message will be exposed to consumer when the local transaction has been committed, or be - * discarded if local transaction has been rolled back. - * - *

- * If the executor don't submit the transaction status for a long time, the server may lookup it forwardly through - * {@link LocalTransactionBranchExecutor#checkLocalTransactionBranch(Message, CheckLocalTransactionBranchContext)} - * - * @version OMS 1.0 - * @since OMS 1.0 - */ - class LocalTransactionExecutor : private Uncopyable { - public: - virtual ~LocalTransactionExecutor() { - - } - - virtual void execute(const MessagePtr &message, const ExecutionContextPtr &context) = 0; - - virtual void check(const MessagePtr &message, const CheckContextPtr &context) = 0; - - }; - - typedef NS::shared_ptr LocalTransactionExecutorPtr; - -END_NAMESPACE_3(io, openmessaging, producer) - -#endif //OMS_LOCAL_TRANSACTION_EXECUTOR_H diff --git a/producer/Producer.h b/producer/Producer.h index abe9ce0..6bc3c27 100644 --- a/producer/Producer.h +++ b/producer/Producer.h @@ -1,16 +1,33 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #ifndef OMS_PRODUCER_H #define OMS_PRODUCER_H +#include #include "smart_pointer.h" #include "MessageFactory.h" #include "ServiceLifecycle.h" #include "Message.h" #include "SendResult.h" +#include "TransactionalSendResult.h" #include "Namespace.h" #include "OMS.h" -#include "LocalTransactionExecutor.h" +#include "TransactionStateCheckListener.h" #include "Future.h" -#include "BatchMessageSender.h" #include "interceptor/ProducerInterceptor.h" BEGIN_NAMESPACE_3(io, openmessaging, producer) @@ -43,51 +60,15 @@ BEGIN_NAMESPACE_3(io, openmessaging, producer) } - /** - * Returns the attributes of this {@code Producer} instance. - * Changes to the return {@code KeyValue} are not reflected in physical {@code Producer}. - *

- * There are some standard attributes defined by OMS for {@code Producer}: - *

    - *
  • {@link OMSBuiltinKeys#PRODUCER_ID}, the unique producer id for a producer instance. - *
  • {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code Producer}. - *
- * - * @return the attributes - */ - virtual KeyValuePtr attributes() = 0; - /** * Sends a message to the specified destination synchronously, the destination should be preset to - * {@link Message#sysHeaders()}, other header fields as well. + * {@link Message#Headers()}, other header fields as well. * * @param message a message will be sent * @return the successful {@code SendResult} - * @throws OMSMessageFormatException if an invalid message is specified. - * @throws OMSTimeOutException if the given timeout elapses before the send operation completes - * @throws OMSRuntimeException if the {@code Producer} fails to send the message due to some internal error. */ - virtual SendResultPtr send(const MessagePtr &message, - const KeyValuePtr &properties = kv_nullptr) = 0; + virtual SendResultPtr send(const MessagePtr &message) = 0; - /** - * Sends a transactional message to the specified destination synchronously, using the specified attributes, - * the destination should be preset to {@link Message#sysHeaders()}, other header fields as well. - *

- * A transactional message will be exposed to consumer if and only if the local transaction - * branch has been committed, or be discarded if local transaction has been rolled back. - * - * @param message a transactional message will be sent - * @param branchExecutor local transaction executor associated with the message - * @param attributes the specified attributes - * @return the successful {@code SendResult} - * @throws OMSMessageFormatException if an invalid message is specified. - * @throws OMSTimeOutException if the given timeout elapses before the send operation completes - * @throws OMSRuntimeException if the {@code Producer} fails to send the message due to some internal error. - */ - virtual SendResultPtr send(const MessagePtr &message, - const LocalTransactionExecutorPtr &executor, - const KeyValuePtr &properties) = 0; /** * Asynchronously send a message to its destination, which is specified in system headers. @@ -100,7 +81,7 @@ BEGIN_NAMESPACE_3(io, openmessaging, producer) * @param properties Optional additional properties. * @return Smart pointer to Future instance. */ - virtual FuturePtr sendAsync(const MessagePtr &message, const KeyValuePtr &properties = kv_nullptr) = 0; + virtual FuturePtr sendAsync(const MessagePtr &message) = 0; /** * Sends a message to the specified destination in one way, using the specified attributes, the destination @@ -112,14 +93,14 @@ BEGIN_NAMESPACE_3(io, openmessaging, producer) * @param message a message will be sent * @param properties the specified userHeaders */ - virtual void sendOneway(const MessagePtr &message, const KeyValuePtr &properties = kv_nullptr) = 0; + virtual void sendOneway(const MessagePtr &message) = 0; /** * Creates a {@code BatchMessageSender} to send message in batch way. * * @return a {@code BatchMessageSender} instance */ - virtual BatchMessageSenderPtr createSequenceBatchMessageSender() = 0; + virtual SendResultPtr send(const list &message) = 0; /** * Adds a {@code ProducerInterceptor} to intercept send operations of producer. @@ -134,6 +115,8 @@ BEGIN_NAMESPACE_3(io, openmessaging, producer) * @param interceptor a producer interceptor will be removed */ virtual void removeInterceptor(const interceptor::ProducerInterceptorPtr &interceptor) = 0; + + TransactionalResultPtr prepare(const MessagePtr &message) = 0; }; typedef NS::shared_ptr ProducerPtr; diff --git a/producer/SendResult.h b/producer/SendResult.h index ad2bf15..4ce9c09 100644 --- a/producer/SendResult.h +++ b/producer/SendResult.h @@ -1,3 +1,19 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #ifndef OMS_SEND_RESULT_H #define OMS_SEND_RESULT_H diff --git a/producer/TransactionStateCheckListener.h b/producer/TransactionStateCheckListener.h new file mode 100644 index 0000000..c791cbe --- /dev/null +++ b/producer/TransactionStateCheckListener.h @@ -0,0 +1,62 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OMS_TRANSACTION_CHECK_LISTENER_H +#define OMS_TRANSACTION_CHECK_LISTENER_H + +#include "Namespace.h" +#include "Message.h" +#include "producer/ExecutionContext.h" +#include "producer/CheckContext.h" +#include "Uncopyable.h" + +BEGIN_NAMESPACE_3(io, openmessaging, producer) + + /** + * Each executor will be associated with a transactional message, can be used to execute local transaction + * branch and submit the transaction status(commit or rollback). + *

+ * + * The associated message will be exposed to consumer when the local transaction has been committed, or be + * discarded if local transaction has been rolled back. + * + *

+ * If the executor don't submit the transaction status for a long time, the server may lookup it forwardly through + * {@link LocalTransactionBranchExecutor#checkLocalTransactionBranch(Message, CheckLocalTransactionBranchContext)} + * + * @version OMS 1.0 + * @since OMS 1.0 + */ + class TransactionStateCheckListener : private Uncopyable { + public: + virtual ~TransactionStateCheckListener() { + + } + /** + * Checks the status of the local transaction branch. + * + * @param message the associated message. + * @param checkContext the check context. + */ + virtual void check(const MessagePtr &message, const CheckContextPtr &checkContext) = 0; + + }; + + typedef NS::shared_ptr TransactionStateCheckListenerPtr; + +END_NAMESPACE_3(io, openmessaging, producer) + +#endif //OMS_TRANSACTION_CHECK_LISTENER_H diff --git a/producer/TransactionalSendResult.h b/producer/TransactionalSendResult.h new file mode 100644 index 0000000..2a90395 --- /dev/null +++ b/producer/TransactionalSendResult.h @@ -0,0 +1,63 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OMS_TRANSACTIONAL_SEND_RESULT_H +#define OMS_TRANSACTIONAL_SEND_RESULT_H + +#include + +#include "smart_pointer.h" +#include "Namespace.h" +#include "Uncopyable.h" + +BEGIN_NAMESPACE_3(io, openmessaging, producer) + + /** + * The result of sending a OMS message to server + * with the message id and some properties. + * + * @version OMS 1.0 + * @since OMS 1.0 + */ + class TransactionalSendResult : public SendResult { + public: + virtual ~TransactionalSendResult() { + + } + + /** + * The unique transactionId id related to the {@code TransactionResult} instance. + * + * @return the transactional id + */ + virtual std::string transactionId() = 0; + + /** + * Commits a transaction. + */ + virtual void commit() = 0; + + /** + * Rolls back a transaction. + */ + virtual void rollback() = 0; + }; + + typedef NS::shared_ptr TransactionalSendResultPtr; + +END_NAMESPACE_3(io, openmessaging, producer) + +#endif // OMS_TRANSACTIONAL_SEND_RESULT_H diff --git a/routing/Operator.h b/routing/Operator.h deleted file mode 100644 index fe0b8a9..0000000 --- a/routing/Operator.h +++ /dev/null @@ -1,34 +0,0 @@ -#ifndef OMS_OPERATOR_H -#define OMS_OPERATOR_H - -#include "smart_pointer.h" -#include "KeyValue.h" -#include "Namespace.h" -#include "Uncopyable.h" - -BEGIN_NAMESPACE_3(io, openmessaging, routing) - - /** - * A {@code Operator} is used to handle the flowing messages in {@code Routing}. - * - * There are many kinds of {@code Operator}, expression operator, deduplicate operator, - * joiner operator, filter operator, rpc operator, and so on. - * - * @version OMS 1.0 - * @since OMS 1.0 - */ - class Operator : private Uncopyable { - public: - virtual ~Operator() { - - } - - virtual KeyValuePtr properties() = 0; - - virtual std::string expression() = 0; - }; - typedef NS::shared_ptr OperatorPtr; - -END_NAMESPACE_3(io, openmessaging, routing) - -#endif //OMS_OPERATOR_H diff --git a/routing/Routing.h b/routing/Routing.h deleted file mode 100644 index 726cb53..0000000 --- a/routing/Routing.h +++ /dev/null @@ -1,31 +0,0 @@ -#ifndef OMS_ROUTING_H -#define OMS_ROUTING_H - -#include - -#include "ServiceLifecycle.h" -#include "Namespace.h" - -BEGIN_NAMESPACE_3(io, openmessaging, routing) - - class Routing : public virtual ServiceLifecycle { - public: - virtual ~Routing() { - - } - - virtual KeyValuePtr properties() = 0; - - virtual std::string source() = 0; - - virtual std::string destination() = 0; - - virtual std::string expression() = 0; - - virtual std::string routingName() = 0; - }; - typedef NS::shared_ptr RoutingPtr; - -END_NAMESPACE_3(io, openmessaging, routing) - -#endif //OMS_ROUTING_H diff --git a/samples/ProducerApp.cpp b/samples/ProducerApp.cpp new file mode 100644 index 0000000..47e9c3d --- /dev/null +++ b/samples/ProducerApp.cpp @@ -0,0 +1,73 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "OpenMessaging.h" +#include "producer/Producer.h" + +#include + +using namespace std; +using namespace io::openmessaging; + +int main(int argc, char *argv[]) { + + MessagingAccessPoint *messagingAccessPoint = + OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east"); + + producer::ProducerPtr producer = messagingAccessPoint->createProducer(); + + interceptor::ProducerInterceptorPtr interceptor = new interceptor::ProducerInterceptor() { + @Override + public void preSend(MessagePtr &message, ContextPtr &attributes) { + } + + @Override + public void postSend(MessagePtr &message, ContextPtr &attributes) { + } + }; + producer->addInterceptor(interceptor); + + producer->start(); + + ByteMessagePtr message =producer->createMessage("NS://HELLO_QUEUE", new MessageBody("HELLO_BODY")); + + producer::SendResultPtr sendResult = producer.send(message); + cout<<"SendResult: " << sendResult << endl; + + //Sends a message to the specified destination async. + FuturePtr sendResultFuture = producer.sendAsync(message); + sendResult = sendResultFuture.get(1000); + cout<<"SendResult: " << sendResult << endl; + + //Sends a message to the specified destination in one way mode. + producer.sendOneway(message); + + //Sends messages to the specified destination in batch mode. + list messages(10); + for (int i = 0; i < 10; i++) { + MessagePtr msg = producer.createMessage("NS://HELLO_QUEUE", new MessageBody("HELLO_BODY")); + messages.push_back(msg); + } + producer.send(messages); + + producer.removeInterceptor(interceptor); + + producer.stop(); + + messages.clear(); + + return 0; +} diff --git a/samples/PullConsumerApp.cpp b/samples/PullConsumerApp.cpp new file mode 100644 index 0000000..ece25ff --- /dev/null +++ b/samples/PullConsumerApp.cpp @@ -0,0 +1,54 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "OpenMessaging.h" +#include "consumer/Consumer.h" + +#include + +using namespace std; +using namespace io::openmessaging; + +int main(int argc, char *argv[]) { + + MessagingAccessPoint *messagingAccessPoint = + OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east"); + + consumer::ConsumerPtr consumer = messagingAccessPoint->createConsumer(); + + interceptor::ConsumerInterceptorPtr interceptor = new interceptor::ConsumerInterceptor() { + @Override + public void preReceive(MessagePtr &message, ContextPtr &attributes) { + } + + @Override + public void postReceive(MessagePtr &message, ContextPtr &attributes) { + } + }; + consumer->addInterceptor(interceptor); + + consumer->bindQueue("NS://HELLO_QUEUE"); + + consumer->start(); + + ByteMessagePtr message = consumer->receive(1000); + + cout<<"Receive Message: " << ByteMessagePtr << endl; + + consumer.ack(ByteMessagePtr->headers()->getMessageId()); + + return 0; +} diff --git a/shared_ptr.hpp b/shared_ptr.hpp index af3b561..84103ee 100644 --- a/shared_ptr.hpp +++ b/shared_ptr.hpp @@ -1,12 +1,20 @@ -/** - * @file shared_ptr.hpp - * @brief shared_ptr is a minimal implementation of smart pointer, a subset of the C++11 std::shared_ptr or boost::shared_ptr. +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * Copyright (c) 2013-2014 Sebastien Rombauts (sebastien.rombauts@gmail.com) + * http://www.apache.org/licenses/LICENSE-2.0 * - * Distributed under the MIT License (MIT) (See accompanying file LICENSE.txt - * or copy at http://opensource.org/licenses/MIT) + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ + + #pragma once #include // NULL diff --git a/smart_pointer.h b/smart_pointer.h index afb77e8..558134a 100644 --- a/smart_pointer.h +++ b/smart_pointer.h @@ -1,3 +1,19 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #ifndef OMS_SMART_POINTER_H #define OMS_SMART_POINTER_H @@ -70,7 +86,7 @@ BEGIN_NAMESPACE_2(io, openmessaging) bool copy_; }; -typedef ManagedArray MessageBody; +typedef ManagedArray MessageBody; END_NAMESPACE_2(io, openmessaging)