Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
need more documentation and test cases.
  • Loading branch information
robkooper committed Nov 8, 2017
0 parents commit c701045
Show file tree
Hide file tree
Showing 12 changed files with 569 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .Rbuildignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
^.*\.Rproj$
^\.Rproj\.user$
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
.Rproj.user
.Rhistory
.RData
.Ruserdata
src/*.o
src/*.so
12 changes: 12 additions & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Package: rrabbitmq
Type: Package
Title: Work with RabbitMQ Messaging System
Version: 1.0.0
Author: Rob Kooper <[email protected]>
Maintainer: Rob Kooper <[email protected]>
Description: Connect to RabbitMQ and send and receive messages on
specific queues.
License: Apache License 2.0
Encoding: UTF-8
LazyData: true
RoxygenNote: 6.0.1
6 changes: 6 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Generated by roxygen2: do not edit by hand

export(rabbitmq_close)
export(rabbitmq_connect)
export(rabbitmq_create_queue)
useDynLib(rrabbitmq, .registration = TRUE)
104 changes: 104 additions & 0 deletions R/wrapper.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#' Connect to a rabbitmq host.
#'
#' This function will connect to a rabbitmq server. The default host
#' it will connect to is localhost. This function needs to be called
#' once before all other functions are called. The function will
#' return TRUE if the connection is created successfully. If already
#' connected or the connection failed to create it will return FALSE.
#'
#' @param host the hostname of the RabbitMQ server
#' @param port the hostname of the RabbitMQ server
#' @param vhost the hostname of the RabbitMQ server
#' @return TRUE if the connection was created or FALSE if no
#' connection could be created.
#'
#' @examples
#' \dontrun{
#' stopifnot(rabbitmq_connect(host = "rabbitmq"))
#' }
#'
#' @export
#' @useDynLib rrabbitmq, .registration = TRUE
rabbitmq_connect <- function(host="localhost", port=5672, vhost="%2F") {
.Call(c_rabbitmq_connect, host)
}

#' Close an open connection to a rabbitmq host.
#'
#' This function will close the connection to a rabbitmq server. The
#' The function will return TRUE if the connection is closed, if there
#' is no connection, or the connection could not be closed it will
#' return FALSE.
#'
#' @return TRUE if the connection was closed or FALSE if the
#' connection could not be closed.
#'
#' @examples
#' \dontrun{
#' rabbitmq_close()
#' }
#'
#' @export
#' @useDynLib rrabbitmq, .registration = TRUE
rabbitmq_close <- function() {
.Call(c_rabbitmq_close)
}

#' Create a queue
#'
#' This function will connect to a rabbitmq server. The default host
#' it will connect to is localhost. This function needs to be called
#' once before all other functions are called. The function will
#' return TRUE if the connection is created successfully. If already
#' connected or the connection failed to create it will return FALSE.
#'
#' @param host the hostname of the RabbitMQ server
#' @return TRUE if the connection was created or FALSE if no
#' connection could be created.
#' @examples
#' \dontrun{
#' stopifnot(rabbitmq_connect(host = "rabbitmq"))
#' }
#'
#' @export
#' @useDynLib rrabbitmq, .registration = TRUE
rabbitmq_create_queue <- function(queue) {
.Call(c_rabbitmq_create_queue, queue)
}

rabbitmq_destroy_queue <- function(queue) {
.Call(c_rabbitmq_destroy_queue, queue)
}

rabbitmq_publish <- function(queue, message) {
.Call(c_rabbitmq_publish, queue, message)
}

rabbitmq_read_message <- function(queue, timeout=NULL) {
result <- .Call(c_rabbitmq_read_message, queue, timeout)
if (is.logical(result) && !result) {
return(list())
} else {
return(list(id=result[[1]], msg=result[[2]]))
}
}

rabbitmq_listen <- function(queue, callback, timeout=NULL) {
while(TRUE) {
result <- rabbitmq_read_message(queue, timeout)
if (length(result) == 0) {
return()
}
callback_result = do.call(callback, list(result$msg))
ack = (!is.logical(callback_result) || as.logical(callback_result))
rabbitmq_ack_message(result$id, ack)
}
}

rabbitmq_ack_message <- function(id, ack=TRUE) {
if(ack) {
.Call(c_rabbitmq_ack_message, id)
} else {
.Call(c_rabbitmq_nack_message, id)
}
}
24 changes: 24 additions & 0 deletions man/rabbitmq_close.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions man/rabbitmq_connect.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions man/rabbitmq_create_queue.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions rrabbitmq.Rproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
Version: 1.0

RestoreWorkspace: Default
SaveWorkspace: Default
AlwaysSaveHistory: Default

EnableCodeIndexing: Yes
UseSpacesForTab: Yes
NumSpacesForTab: 2
Encoding: UTF-8

RnwWeave: Sweave
LaTeX: pdfLaTeX

StripTrailingWhitespace: Yes

BuildType: Package
PackageUseDevtools: Yes
PackageInstallArgs: --no-multiarch --with-keep.source
PackageRoxygenize: rd,collate,namespace
2 changes: 2 additions & 0 deletions src/Makevars
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
C=clang
PKG_LIBS = -lrabbitmq
30 changes: 30 additions & 0 deletions src/init.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#include <R.h>
#include <Rinternals.h>
#include <stdlib.h>
#include <R_ext/Rdynload.h>

extern SEXP rabbitmq_connect(SEXP host);
extern SEXP rabbitmq_close();
extern SEXP rabbitmq_create_queue(SEXP queue);
extern SEXP rabbitmq_destroy_queue(SEXP queue);
extern SEXP rabbitmq_publish(SEXP queue, SEXP msg);
extern SEXP rabbitmq_read_message(SEXP queue, SEXP timeout);
extern SEXP rabbitmq_ack_message(SEXP message);
extern SEXP rabbitmq_nack_message(SEXP message);

static const R_CallMethodDef CallEntries[] = {
{"c_rabbitmq_connect", (DL_FUNC) &rabbitmq_connect, 1},
{"c_rabbitmq_close", (DL_FUNC) &rabbitmq_close, 0},
{"c_rabbitmq_create_queue", (DL_FUNC) &rabbitmq_create_queue, 1},
{"c_rabbitmq_destroy_queue", (DL_FUNC) &rabbitmq_destroy_queue, 1},
{"c_rabbitmq_publish", (DL_FUNC) &rabbitmq_publish, 2},
{"c_rabbitmq_read_message", (DL_FUNC) &rabbitmq_read_message, 2},
{"c_rabbitmq_ack_message", (DL_FUNC) &rabbitmq_ack_message, 1},
{"c_rabbitmq_nack_message", (DL_FUNC) &rabbitmq_nack_message, 1},
{NULL, NULL, 0}
};

void R_init_rrabbitmq(DllInfo *dll) {
R_registerRoutines(dll, NULL, CallEntries, NULL, NULL);
R_useDynamicSymbols(dll, FALSE);
}
Loading

0 comments on commit c701045

Please sign in to comment.