Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add finagle-clojure/core.async, integration with clojure.core.async #17

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core.async/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/target
/classes
/checkouts
pom.xml
pom.xml.asc
*.jar
*.class
/.lein-*
/.nrepl-port
.hgignore
.hg/
8 changes: 8 additions & 0 deletions core.async/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# finagle-clojure/core.async

Clojure core.async adapters for finagle-clojure.

### Dependency

[finagle-clojure/core.async "0.4.2-SNAPSHOT"]

15 changes: 15 additions & 0 deletions core.async/project.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
(defproject finagle-clojure/core.async "0.4.2-SNAPSHOT"
:description "Clojure core.async integration with finagle-clojure"
:url "https://github.com/twitter/finagle-clojure"
:license {:name "Apache License, Version 2.0"
:url "https://www.apache.org/licenses/LICENSE-2.0"}
:scm {:name "git" :url "http://github.com/finagle/finagle-clojure"}
:plugins [[lein-midje "3.2"]]
:profiles {:test {:dependencies [[midje "1.8.3" :exclusions [org.clojure/clojure]]]}
:dev [:test {:dependencies [[org.clojure/clojure "1.8.0"]]}]
:1.7 [:test {:dependencies [[org.clojure/clojure "1.7.0"]]}]
:1.6 [:test {:dependencies [[org.clojure/clojure "1.6.0"]]}]
:1.5 [:test {:dependencies [[org.clojure/clojure "1.5.1"]]}]
:1.4 [:test {:dependencies [[org.clojure/clojure "1.4.0"]]}]}
:dependencies [[finagle-clojure/core "0.4.2-SNAPSHOT"]
[org.clojure/core.async "0.2.374"]])
71 changes: 71 additions & 0 deletions core.async/src/finagle_clojure/core_async.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
(ns finagle-clojure.core-async
"Adapters to use Futures with core.async."
(:require [finagle-clojure.futures :as f]
[finagle-clojure.scala :as scala]
[clojure.core.async :as a])
(:import [com.twitter.util Future]))

(defn ^:no-doc throw-error
[o]
(if (instance? Throwable o)
(throw o)
o))

;; this needs to be a macro so it expands in the scope of the enclosing go block
;; otherwise the async take complains that it isn't in a go block
;; throw-error needs to be public so it's visible in the macro
(defmacro <?
"Similar to `clojure.core.async/<!`, but will throw instances of Throwable.

*Arguments*:

* `c`: a core.async chan

*Returns*:

The value from `c`, or throws it if it's `Throwable`."
[c]
`(throw-error (a/<! ~c)))

(defn <??
"Similar to `clojure.core.async/<!!`, but will throw instances of Throwable.

*Arguments*:

* `c`: a core.async chan

*Returns*:

The value from `c`, or throws it if it's `Throwable`."
[c]
(throw-error (a/<!! c)))

(defn- enqueue-to-chan
[c]
(fn [v]
;; Close the channel after the value has been put into channel c
;; to make sure it isn't closed before the value has been submitted.
(a/put! c v (fn [_] (a/close! c)))
scala/unit))

(defn future->chan
"Enqueues the value or Throwable that a Future is defined with to a channel.
If no chan is provided a new `promise-chan` will be created and returned.

*Arguments*:

* `f`: a Future
* `c`: (optional) a core.async chan

*Returns*:

The chan to which the result of Future `f` will be enqueued.

See the helper fns [[<??]] & [[<?]] to take a value fro a chan and throw
it if it's an instance of `Throwable`."
([^Future f]
(future->chan f (a/promise-chan)))
([^Future f c]
(f/on-success* f (enqueue-to-chan c))
(f/on-failure* f (enqueue-to-chan c))
c))
17 changes: 17 additions & 0 deletions core.async/test/finagle_clojure/core_async_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
(ns finagle-clojure.core-async-test
(:require [finagle-clojure.core-async :refer :all]
[finagle-clojure.futures :as f]
[clojure.core.async :as a]
[midje.sweet :refer :all]))

(facts "future->chan"
(<?? (future->chan (f/exception (Exception.)))) => (throws Exception)
(<?? (future->chan (f/value :value))) => :value
(a/<!! (a/go (<? (future->chan (f/value :value))))) => :value
(let [c (a/chan 1)
e (Exception.)]
(a/<!! (future->chan (f/exception e) c)) => e
(a/<!! c) => nil)
(let [c (a/chan 1)]
(a/<!! (future->chan (f/value :value) c)) => :value
(a/<!! c) => nil))