Skip to content

Commit

Permalink
Allow asynchronous queries to DB
Browse files Browse the repository at this point in the history
Uses future package
  • Loading branch information
quartin committed Aug 13, 2018
1 parent 1ee9209 commit 7433d8c
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 20 deletions.
10 changes: 6 additions & 4 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: dbutils
Type: Package
Title: A set of utility functions when dealing with databases in R.
Version: 0.1.0
Title: A set of utility functions for dealing with databases in R
Version: 0.2.0
Maintainer: Susana Quartin <susanaquartin@gmail.com>
Description: More about what it does (maybe more than one line)
Use four spaces when indenting paragraphs within the Description.
Expand All @@ -13,11 +13,13 @@ Authors@R: c(
License: GPL (>= 2)
Encoding: UTF-8
LazyData: true
RoxygenNote: 6.0.1
RoxygenNote: 6.1.0
Depends:
R (>= 3.1.2)
Imports:
DBI,
odbc,
dbplyr,
dplyr,
readr
readr,
future
7 changes: 6 additions & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ export(fetch_db)
importFrom(DBI,dbGetQuery)
importFrom(DBI,sqlInterpolate)
importFrom(dbplyr,in_schema)
importFrom(dplyr,"%>%")
importFrom(dplyr,as_tibble)
importFrom(dplyr,tbl)
importFrom(future,future)
importFrom(future,multiprocess)
importFrom(future,plan)
importFrom(odbc,dbConnect)
importFrom(odbc,dbDisconnect)
importFrom(odbc,odbc)
importFrom(readr,read_file)
62 changes: 52 additions & 10 deletions R/fetch_db.R
Original file line number Diff line number Diff line change
@@ -1,29 +1,71 @@
#' @export
#' @title Fetch data from db
#'
#' @param conn A database connection.
#' @param path A sql file path.
#' @param conn A database connection. Only necessary if `async = FALSE`.
#' @param path A SQL file path. Optionally, use `query`.
#' @param query A SQL string. Optionally, use `path`. If both are supplied, `path`` is used.
#' @param async A logical indicating whether to execute the query asynchronously or (default) not.
#' @param dsn The Data Source Name.
#' @param ... Named values to interpolate into the query string.
#'
#' @return
#' A data frame with as many rows as records and as many
#' If processed in a synchronous manner (the default), a data frame with as many rows as records and as many
#' columns as fields in the result set.
#' If processed in an asynchronous manner, a Future. To get the value of a future, use future::value.
#' In case of error, a print of the full SQL error is
#' returned before execution of error action.
fetch_db <- function(conn = NULL, path = NULL, query = NULL, async = FALSE, dsn = NULL, ...) {
if (async) {
if(!missing(conn)) {
message("Asynchronous execution flag set to TRUE but a connection was provided. Connection will not be used.")
}
fetch_async(path, query, dsn, ...)
} else {
fetch_default(conn, path, query, ...)
}
}

#' @title Fetch init
#'
#' @importFrom readr read_file
#' @importFrom DBI sqlInterpolate dbGetQuery
#' @importFrom dplyr "%>%" as_tibble
fetch_db <- function(conn, path, ...) {
query_string <- read_file(path) %>%
sqlInterpolate(conn, ., ...)
#' @importFrom DBI sqlInterpolate
fetch_init <- function(conn, path, query, ...) {
query_str <- if (!is.null(path)) {
readr::read_file(path)
} else if (!is.null(query)) {
query
} else {
stop("Both path and query are empty.")
}
if (missing(...)) query_str else sqlInterpolate(conn, query_str, ...)
}

#' @title Fetch default
#'
#' @importFrom DBI dbGetQuery
#' @importFrom dplyr as_tibble
fetch_default <- function(conn, path, query, ...) {
query_str <- fetch_init(conn, path, query, ...)

tryCatch({
dbGetQuery(conn, query_string) %>%
as_tibble()
as_tibble(dbGetQuery(conn, query_str))
},
error = function(e) {
print(e$message)
stop(e)
})
}

#' @importFrom future future plan multiprocess
#' @importFrom odbc dbDisconnect dbConnect odbc
fetch_async <- function(path, query, dsn, ...) {
if(is.null(dsn)) stop("Missing dsn string")
future::plan(future::multiprocess)

future({
con <- odbc::dbConnect(odbc::odbc(), dsn)
out <- fetch_default(con, path, query, ...)
odbc::dbDisconnect(con)
out
})
}
23 changes: 18 additions & 5 deletions man/fetch_db.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions man/fetch_default.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions man/fetch_init.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 7433d8c

Please sign in to comment.