From f52eaae10cfc0e66e354eaab9a94fddb1b07a162 Mon Sep 17 00:00:00 2001 From: Nikita Prokopov Date: Tue, 24 Oct 2023 10:33:09 +0100 Subject: [PATCH] Sync: initial --- deps.edn | 1 + docs/sync.md | 51 +++++++++++++++++++ project.clj | 3 +- src/datascript/db.cljc | 4 ++ src/datascript/sync/client.cljc | 73 +++++++++++++++++++++++++++ src/datascript/sync/server.cljc | 54 ++++++++++++++++++++ test/datascript/test.cljc | 1 + test/datascript/test/core.cljc | 8 +++ test/datascript/test/sync.clj | 87 +++++++++++++++++++++++++++++++++ 9 files changed, 281 insertions(+), 1 deletion(-) create mode 100644 docs/sync.md create mode 100644 src/datascript/sync/client.cljc create mode 100644 src/datascript/sync/server.cljc create mode 100644 test/datascript/test/sync.clj diff --git a/deps.edn b/deps.edn index d8afee1f..f8f3178a 100644 --- a/deps.edn +++ b/deps.edn @@ -37,6 +37,7 @@ cheshire/cheshire {:mvn/version "5.10.0"} com.cognitect/transit-clj {:mvn/version "1.0.324"} com.cognitect/transit-cljs {:mvn/version "0.8.269"} + org.clojure/core.async {:mvn/version "1.6.681"} } } diff --git a/docs/sync.md b/docs/sync.md new file mode 100644 index 00000000..8e499406 --- /dev/null +++ b/docs/sync.md @@ -0,0 +1,51 @@ +Server assigns id: + +- Changes when client reconnects + +Client assigns id: + +- What to do with client before :catch-up message? +- Where to store send-fn? +- Can track how far each client is. Why? + + +``` +tx :: {:tx-data [...] + :tx-id + :server-idx } +``` + +# Client connects to a server + +``` +SND {:message :catching-up + :patterns [ ...] + :server-idx ?} + +RCV {:message :catched-up + :snapshot + :server-idx } + +or + +RCV {:message :catched-up + :txs [ ...]} +``` + +# Client makes a transaction + +``` +SND {:message :transacting + :server-idx server-idx + :txs [{:tx-data ... + :tx-id ...} ...]} +``` + +# Server broadcasts a transaction + +``` +RCV {:message :transacted + :tx-data ... + :tx-id ... + :server-idx ...} ...]} +``` diff --git a/project.clj b/project.clj index 5c5a7377..7a53750e 100644 --- a/project.clj +++ b/project.clj @@ -85,7 +85,8 @@ :test {:dependencies [[metosin/jsonista "0.3.3"] [cheshire "5.10.0"] [com.cognitect/transit-clj "1.0.324"] - [com.cognitect/transit-cljs "0.8.269"]]} + [com.cognitect/transit-cljs "0.8.269"] + [org.clojure/core.async "1.6.681"]]} :bench {:dependencies [[criterium "0.4.6"] [metosin/jsonista "0.3.3"] [com.clojure-goes-fast/clj-async-profiler "0.5.1"]]} diff --git a/src/datascript/db.cljc b/src/datascript/db.cljc index 5ed8f4f1..69fc4ecf 100644 --- a/src/datascript/db.cljc +++ b/src/datascript/db.cljc @@ -1779,3 +1779,7 @@ :else (raise "Bad entity type at " entity ", expected map or vector" {:error :transact/syntax, :tx-data entity}))))) + +(defn tx-from-datoms [datoms] + (mapv #(vector (if (:added %) :db/add :db/retract) (:e %) (:a %) (:v %)) datoms)) + \ No newline at end of file diff --git a/src/datascript/sync/client.cljc b/src/datascript/sync/client.cljc new file mode 100644 index 00000000..9b895fc3 --- /dev/null +++ b/src/datascript/sync/client.cljc @@ -0,0 +1,73 @@ +(ns datascript.sync.client + (:require + [datascript.conn :as conn] + [datascript.db :as db] + [datascript.serialize :as serialize])) + +(defn client-id [] + (long (* (rand) 9007199254740991))) + +(def *last-tx-id + (atom 0)) + +(defn new-tx-id [client-id] + [client-id (swap! *last-tx-id inc)]) + +(defn on-tx [conn report] + (when-not (:server? (:tx-meta report)) + (let [{:keys [client-id send-fn server-idx pending]} (meta conn) + tx {:tx-data (db/tx-from-datoms (:tx-data report)) + :tx-id (new-tx-id client-id)}] + (send-fn + {:message :transacting + :server-idx @server-idx + :txs [tx]}) + (swap! pending conj tx)))) + +(defn create-conn [patterns send-fn] + (let [res (atom nil :meta + {:client-id (client-id) + :server-db (atom nil) + :pending (atom #?(:clj clojure.lang.PersistentQueue/EMPTY + :cljs cljs.core.PersistentQueue.EMPTY)) + :server-idx (atom nil) + :send-fn send-fn + :listeners (atom {})})] + (send-fn {:message :catching-up}) + res)) + +(defn server-message [conn body] + (case (:message body) + :catched-up + (let [{:keys [snapshot server-idx]} body + db (serialize/from-serializable snapshot)] + (reset! conn db) + (reset! (:server-db (meta conn)) db) + (reset! (:server-idx (meta conn)) server-idx) + (conn/listen! conn :sync #(on-tx conn %))) + + :transacted + (let [{:keys [tx-data tx-id server-idx]} body + {*server-db :server-db + *server-idx :server-idx + *pending :pending + *listeners :listeners} (meta conn) + report (conn/with @*server-db tx-data {:server? true}) + server-db' (:db-after report)] + (reset! *server-db server-db') + (reset! *server-idx server-idx) + (if (= tx-id (:tx-id (peek @*pending))) + (swap! *pending pop) + (do + (reset! conn (reduce conn/db-with server-db' @*pending)) + (doseq [[_ callback] @*listeners] + (callback report)))))) + nil) + +(defn server-disconnected [conn] + ;; TODO impl me + ) + +(defn server-connected [conn] + ;; TODO impl me + ) diff --git a/src/datascript/sync/server.cljc b/src/datascript/sync/server.cljc new file mode 100644 index 00000000..14c2bab7 --- /dev/null +++ b/src/datascript/sync/server.cljc @@ -0,0 +1,54 @@ +(ns datascript.sync.server + (:require + [datascript.conn :as conn] + [datascript.db :as db] + [datascript.serialize :as serialize])) + +(defn- client [conn channel] + (get @(:clients (meta conn)) channel)) + +(defn on-tx [conn report] + ;; TODO filter what to send where + (let [msg {:message :transacted + :tx-data (db/tx-from-datoms (:tx-data report)) + :tx-id (:tx-id (:tx-meta report)) + :server-idx (:db/current-tx (:tempids report))}] + (doseq [[channel {:keys [status send-fn]}] @(:clients (meta conn)) + ; :let [_ (prn "broadcasting to" channel status)] + :when (= :active status)] + (send-fn channel msg)))) + +(defn client-connected [conn channel send-fn] + (let [*clients (:clients (meta conn)) + clients' (swap! *clients assoc channel + {:status :connected + :send-fn send-fn})] + (when (= 1 (count clients')) + (conn/listen! conn :sync #(on-tx conn %))) + nil)) + +(defn client-message [conn channel body] + (case (:message body) + :catching-up + (let [{:keys [patterns server-idx]} body ;; TODO delta from server-idx + {:keys [send-fn]} (client conn channel) + db @conn] + (send-fn channel + {:message :catched-up + :snapshot (serialize/serializable db) ;; TODO patterns + :server-idx (:max-tx db)}) + ;; TODO race - external txs between (:max-tx db) and after :status :active + (swap! (:clients (meta conn)) update channel assoc :status :active)) + + :transacting + (doseq [{:keys [tx-data tx-id]} (:txs body)] + ;; TODO handle exception here + (conn/transact! conn tx-data {:tx-id tx-id}))) + nil) + +(defn client-disconnected [conn channel] + (let [*clients (:clients (meta conn)) + clients' (swap! *clients dissoc channel)] + (when (= 0 (count clients')) + (conn/unlisten! conn :sync)) + nil)) diff --git a/test/datascript/test.cljc b/test/datascript/test.cljc index dc712b0e..085884b2 100644 --- a/test/datascript/test.cljc +++ b/test/datascript/test.cljc @@ -36,6 +36,7 @@ datascript.test.query-v3 datascript.test.serialize #?(:clj datascript.test.storage) + #?(:clj datascript.test.sync) datascript.test.transact datascript.test.tuples datascript.test.validation diff --git a/test/datascript/test/core.cljc b/test/datascript/test/core.cljc index 75922f5a..e1fee50d 100644 --- a/test/datascript/test/core.cljc +++ b/test/datascript/test/core.cljc @@ -97,6 +97,14 @@ #?(:clj (transit-read (.getBytes ^String s "UTF-8") :json) :cljs (transit-read s :json))) +#?(:clj + (def lock (Object.))) + +(defn log [& args] + #?(:clj (locking lock + (apply println args)) + :cljs (apply println args))) + ;; Core tests (deftest test-protocols diff --git a/test/datascript/test/sync.clj b/test/datascript/test/sync.clj new file mode 100644 index 00000000..f9b7c6f5 --- /dev/null +++ b/test/datascript/test/sync.clj @@ -0,0 +1,87 @@ +(ns datascript.test.sync + (:require + [clojure.core.async :as async :refer [>! ! ch [:c1 (freeze %)]))) + c2 (client/create-conn nil #(go (>! ch [:c2 (freeze %)])))] + (server/client-connected server :c1 (fn [_ msg] (go (>! ch1 (freeze msg))))) + (server/client-connected server :c2 (fn [_ msg] (go (>! ch2 (freeze msg))))) + (go-loop [] + (when-some [msg (