RはCの関数を呼ぶことが出来るだけではなく、Cの中に組み込むこともできます(Embedding R)。JavaもJNIで、C++のオブジェクトを呼ぶことも、Cのオブジェクトとして呼ばれることもできます。よって、RとJavaは相互に利用できます。
実際に使おうとするとC/C++の層が煩雑なのですが、RのrJavaパッケージを使うとC/C++の層を隠して使うことができ、かなり見通しが良くなります。rJavaパッケージは、RからJavaを呼び出す機能に、JavaからRを呼び出すJRIパッケージを統合したのです。
Rの拡張はC++とFortranで数値解析をするものが多く、また、システム関係のAPIもCで提供されることが多いので、rJava/JRIの使いどころはあまりなかったのですが、気づくと世の中の状況が変わっていました。RabbitMQは本家がC/C++のAPIを提供する気が無さそうですし、Apache PulsarのようなJavaで書かれたミドルウェアは、やはりJavaのクライアント用APIが第一です。
Rのサポートが無いミドルウェアを使う場合、Javaのクライアントを書いてRを呼び出すのが無難なケースも出てきました。ややこしそうな感じがしますが、使ってみたらそうでもなかったので、紹介します。
Javaのインストール
Ubuntu 24.04でOpenJDKをインストールします。
sudo apt update sudo apt install openjdk-21-jdk libbz2-dev
aptのパッケージ名が分からなくなったときは、apt search openjdk | grep -E 'openjdk-.*-jdk/'とすると、あたりがつくと思います。
環境変数の設定①
rJavaパッケージのインストールをする前に、環境変数の設定をしましょう。
# パッケージのインストール先 exort .R_LIBS=/home/username/.RLIBS # Rの bin/ etc/ lib/ modules/ があってCOPYINGが置いてあるディレクトリ export R_HOME=/usr/lib/R
rJavaのインストール
終わったら、Rでインストールします。
R -e 'install.packages("rJava")'
環境変数の設定②
Javaから$R_LIBS/rJava/jri以下のクラスライブラリを呼び出すので、CLASSPATHをそう設定します。また、$R_LIBS/rJava/jri/libjri.soをC++から呼ぶので、LD_LIBRARY_PATHを通します。
export CLASSPATH=$CLASSPATH:.:$R_LIBS/rJava/jri/JRI.jar:$R_LIBS/rJava/jri/JRIEngine.jar:$R_LIBS/rJava/jri/REngine.jar export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$R_LIBS/rJava/jri
Javaを動かすときに-Djava.library.path=$R_LIBS/rJava/jriをつければ不要なのですが、.bashrcにでも書いておいた方が楽だと思います。
JavaとRの連携の確認
簡単なコードで、JavaとRを連携させてみましょう。
ソースコードの編集
必要な設定はコレだけです。
import org.rosuda.JRI.REXP; import org.rosuda.JRI.Rengine; public class ExampleJRI { public static void main(String[] args) { // 第1引数の文字列の配列は、Rのコマンドラインの起動オプション Rengine engine = new Rengine(new String[]{"--no-save"}, false, null); // 変数をセット engine.assign("x", new int[]{121}); // evalして結果を取得 REXP result = engine.eval("sqrt(x)"); // 結果を表示 System.out.println(result.asDouble()); // Rengineのインスタンスを終了 engine.end(); } }
Rengineのインスタンスのメソッド#assignで変数をセットし、#evalで実行して、戻り値のresultを参照する流れになります。
ドキュメントが見当たらないので、細かいところはJRIのソースコードを読む必要があるのが難ですが、#assignはint/int/double/double/String/String[]の変換はディスパッチしてくれるのでシームレスですし、REXPも #asInt(), #asIntArray(), #asDouble(), #asDoubleArray(), #asDoubleMatrix(), #asBool(), #toString() を使う程度であれば困らないでしょう。
JavaのRabbitMQクライアントから、Rを呼び出す
もう少し実践的な例として、RabbitMQを使ってみます。
作業用ディレクトリを作成して移動
export rj_temp=/var/tmp/rabbitmq-java-wd mkdir $rj_temp cd $rj_temp
RabbitMQ Clientとslf4jをダウンロード
RabbitMQの公式指定のバージョンのバイナリーにします。
export v_slf4j=1.7.36 wget https://repo1.maven.org/maven2/org/slf4j/slf4j-api/${v_slf4j}/slf4j-api-${v_slf4j}.jar wget https://repo1.maven.org/maven2/org/slf4j/slf4j-simple/${v_slf4j}/slf4j-simple-${v_slf4j}.jar
落としたjarファイル2つを、CLASSPATHに追加しておきましょう。
export CLASSPATH=$CLASSPATH:$rj_temp/amqp-client-5.24.0.jar:$rj_temp/slf4j-api-${v_slf4j}.jar:$rj_temp/slf4j-simple-${v_slf4j}.jar
ソースコード
Send.javaはRabbitMQのチュートリアルの例そのまま、Recv.javaはRを呼び出すように書き換えたものです。また、Recv.javaでRにprepare.Rを読み込むように命令しているので、prepare.Rも用意します。
送信側
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.nio.charset.StandardCharsets; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } }
受信側
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.nio.charset.StandardCharsets; import org.rosuda.JRI.REXP; import org.rosuda.JRI.Rengine; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Rengine engine = new Rengine(new String[]{"--no-save"}, false, null); REXP result = engine.eval("source(\"prepare.R\")"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); engine.assign("msg", message); REXP r = engine.eval("when_receiving_message(msg)"); System.out.println("R returns " + r.asInt() + "."); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); engine.end(); } }
受信側の起動時に読み込むRのコード
when_receiving_message <- function(msg){ nchar(msg) }
RabbitMQの起動を確認
sudo service rabbitmq-server status
止まっていたら、
sudo service rabbitmq-server start
しましょう。
コンパイルと実行
コンパイルして実行すると、RでHello World!の文字数を数えて戻せているのが分かります。
javac Send.java Recv.java java Recv& java Send
なお、Recvは、fgしてCTRL+Cで止めてください。