Skip to content

Unified Framework

Yves Ineichen edited this page Oct 11, 2016 · 1 revision

A common framework for sketching

We identify the common structure in all sketchings implemented in Skylark and this enables us to express sketching in a simple, concrete algorithm for all cases. A number of different streams are defined in the process (realized as vectors of random or deterministic quantities) and they are written down exlicitly for each sketch available. This approach, apart from its unifying simplicity in expressing the sketching idea, gives us the opportunity to succinctly express the kernel algorithm under various programming models - however still in common notation: message passing and streaming are two notable examples. It is expected that performance-wise specializations under these models can stem out of this approach in a disciplined manner.

A common framework for sketching

We identify the common structure in all sketchings implemented in Skylark and this enables us to express sketching in a simple, concrete algorithm for all cases. A number of different streams are defined in the process (realized as vectors of random or deterministic quantities) and they are written down exlicitly for each sketch available. This approach, apart from its unifying simplicity in expressing the sketching idea, gives us the opportunity to succinctly express the kernel algorithm under various programming models - however still in common notation: message passing and streaming are two notable examples. It is expected that performance-wise specializations under these models can stem out of this approach in a disciplined manner.

Sketching Algorithm

A is assumed to be a height x width matrix.

for l in [0, sketch_size): k = select_stream[l] A_hat[k, :] <- sum_{i in mix_set[k]} mix_factor[k][i] * A[i, :] A_hat[k, :] <- post_mix[k](A_hat[k, :]) A_hat[l, :] <- A_hat[k, :]

This is for columnwise (left) sketching (dimensions: n(=height) -> sketch_size); for rowwise (right) sketching we just replace A[any, :] with A[:, any] and similarly for the sketched matrix A_hat (and also row(s) with column(s) in what follows).

Definitions

mix_set

The set of indices of rows that will "mix" (linearly combine/accumulate) towards a row in A_hat.

mix_set[k] = {i : hash_stream[i] = k} hash_transform : CWT, WZT, MMT

mix_set[k] = [0, n) if hash_stream is None dense_transform : JLT, CT FJLT RFT : GaussianRFT, LaplacianRFT RFUT

mix_factor

mix_factor[k][i] is the scaling of the ith row in A to accumulate into lth row of A_hat (k = select_stream[l]).

mix_factor[k][i] = value_stream[i] hash_transform : CWT, WZT, MMT

mix_factor[k][i] = value_stream[k * sketch_size + i] dense_transform : JLT, CT RFT : GaussianRFT, LaplacianRFT

mix_factor[k][i] = FUT[i, k] * value_stream[i] FJLT (FUT = DFT) RFUT

Note: FUT[k, i] for rowwise.

post_mixk

A function that further process kth row x by indpendently processing each of its entries x[j] (immediately after the mix stage).

post_mixk = x[j] dense_transform : JLT, CT hash_transform : CWT, WZT, MMT FJLT RFUT

post_mixk = scale * cos(x[j] / sigma + parameter_stream[k]) RFT : GaussianRFT, LaplacianRFT

value_stream

A stream of real random samples drawn from a distribution. The distribution and the number of entries drawn from it (size) follow.

JLT : value_stream ~ normal, size = sketch_size * n CT : value_stream ~ cauchy, size = sketch_size * n FJLT : value_stream ~ rademacher, size = n CWT : value_stream ~ rademacher, size = n WZT : value_stream ~ reciprocal_exponential, size = n (extra parameter: p) MMT : value_stream ~ cauchy, size = n GaussianRFT : value_stream ~ normal, size = sketch_size * n LaplacianRFT : value_stream ~ cauchy, size = sketch_size * n RFUT : value_stream ~ any, size = n

select_stream

A stream of sketch_size integers either random samples drawn from a distribution or a list of consecutive ones (select_stream[l] = l)

select_stream = [0, sketch_size) dense_transform : JLT, CT hash_transform : CWT, WZT, MMT RFT : GaussianRFT, LaplacianRFT

select_stream ~ uniform([0, n)) FJLT RFUT

hash_stream

A stream of n integer random samples drawn from a distribution or None (used in the definition of mix_set)

hash_stream ~ uniform([0, sketch_size)) hash_transform : CWT, WZT, MMT

hash_stream = None dense_transform : JLT, CT RFT : GaussianRFT, LaplacianRFT FJLT RFUT

parameter_stream

A stream of sketch_size real random samples drawn from a distribution or None (used in the definition of function post_mixk)

parameter_stream ~ uniform([0, 2*pi)) RFT : GaussianRFT, LaplacianRFT

parameter_stream = None dense_transform : JLT, CT FJLT RFUT hash_transform : CWT, WZT, MMT

Parallel Computation

At this stage it would be helpful to introduce ownership arrays for the matrix elements, meaning which process owns which element. To this end we further assume the typical organization of our p processes in a p = q x r rectangular process grid, so each process can be uniquely identified with its coordinates (s, t) in this grid. There are two basic matrix objects in our analysis, the original matrix A and its sketched counterpart A_hat so it makes sense to define two ownership arrays:

loaders[i, j]

This is the set of processes that should own entry A[i, j] if the matrix is loaded at a preprocessing stage. This set will include at least one process; more processes are permitted in the case of redundancy schemes during the initial data distribution.

processors[l, j]

This is the set of processes that should be responsible for updating entry A_hat[l, j] (sketched matrix). This set will include at least one process; more processes are permitted in the case of redundant computation.

Note that both these onwership arrays are just a notational convenience to contrast the owners of A and A_hat. For a given data distribution scheme there is a single, underlying, generic matrix coordinate (i, j) ownership array owner[i, j] (with loaders[i, j] = owner[i, j], processors[l, j] = owner[l, j] in our case). We also define its "inverse":

entries[s, t]

This is the set of matrix entries (i, j) hosted by process (s, t): entries[s,t] = {(i, j) : owner[i, j] = (s, t)}

Note ^^^^ entries[s, t] (and thus owner[s,t]) depend on two additional entities that were suppressed for convenience in the previous discussion:

  • matrix
  • data distribution scheme So it would be more complete (but more tedious) to write entries[distribution][matrix][s,t] and owner[distribution][matrix][s,t] instead. In fact loaders[i, j] = owner[any][A][i, j] = owner[A][i, j] and processors[l, j] = processors[any][A_hat][l, j] = processors[A_hat][l, j].

Dissecting the sketching algorithm this a naive plan for computing A_hat[l, j] (at processes in processors[l, j] set):

entry_compute(l, j)

k = select_stream[l]
compute mix_set[k] 
for i in mix_set[k]:
    A_hat[l, j] += mix_factor[k][i] * A[i, j]
A_hat[l, j] = post_mix(A_hat[l, j])

where select_stream is assumed invertible.

This plan is to be executed for every A_hat[l, j] at process (s,t) with (s,t) in processors[l, j]. So individually each (s,t) process should naively execute the following plan:

process_compute(s, t)

compute entries[s, t]
for (l, j) in entries[s, t]:
    entry_compute(l, j)

Discussion

It is important to make some important remarks:

  1. The following quantities can be computed at process (s, t) in communication-free mode: a. All streams (value_stream, select_stream, hash_stream, parameter_stream) b. entries[s,t] c. FUT matrix entries d. mix_set (follows from hash_stream), mix_factor (follows from value_stream, FUT), post_mix (follows from parameter_stream).

  2. In order to update an entry in column j in the sketched matrix A_hat, only entries of the original matrix A in the same column j are needed.

The validity of 1a follows directly from the property of all streams indexing - in communication-free mode - into different parts of a common stream; 2b from existing formulae for matrix data distribution given its type. Also 1c is a consequence of FUT values computed as a function of their entry coordinates.

These remarks indicate that all a process (s, t) needs to perform colummwise sketching are input matrix entries in the same columns j (as available locally for the same-width/different-height sketched_matrix) however at rows i in the union of mix_set[k] (for k in select_stream[l] for all l in the first coordinates of the pairs in entries[s,t]) - and with the understanding than all i, j, k, l related index sets can be computed in communication-free mode.

Let's now exploit these remarks for parallel sketching both in streaming and message passing configurations.

Parallel streaming

In a simple scenario, let's assume that processes are required to host the sketched matrix A_hat in a given data distribution and can independently read from file each entry A[u, v] of the original matrix for known u, v only once (single pass). The order of (u, v) index pairs can be made implicit if we additionally assume that the matrix data in the file is organized in a standard one-dimensional traversal indexing (column-major or row-major), but this is not necessary. Here is a prototype listing of computation activities at process (s, t) in this setting:

parallel_streaming(s, t)

compute entries[s,t]
j_set = {j for (l, j) in entries[s, t]}
for index in [0, height * width):
      read A[u, v]
      if v in j_set:
	 k_set = {k : u in mix_set[k]}
	 for k in k_set:
	     l = invert[select_stream][k]
	     A_hat[l, v] += mix_factor[k][u] * A[u, v]
for (l, j) in entries[s, t]:
    A_hat[l, j] = post_mix(A_hat[l, j])

where the notation x = invert[f][y] is used for "inverting" y = f[x] (for an invertible mapping f); here select_stream is assumed invertible.

Message passing

In parallel streaming there is only one provider entity: the input matrix file that is sequentially accessed by the computing processes. In the message passing paradigm, input data are assumed loaded/distributed by the processes. So a process (s,t) both provides (parts of A) and computes (parts of A_hat). Therefore it conceptually needs to perform the following:

message_passing(s, t)

# send 
compute entries[A][s, t]
for (i, j) in entries[A][s, t]:
    k_set = {k: i in mix_set[k]}
    for k in in k_set:
    	l = invert[select_stream][k]
    	send A[i, j] to owner[A_hat][l, j](=processors[l, j])

# receive and compute
compute entries[A_hat][s, t]
for (l, j) in entries[A_hat][s,t]:
    k = select_stream[l]
    compute mix_set[k]
    for i in mix_set[k]:
    	receive A[i, j] from owner[A][i, j](=loaders[i, j])
	A_hat[l, j] += mix_factor[k][i] * A[i, j]
for (l, j) in entries[A_hat][s, t]:
    A_hat[l, j] = post_mix(A_hat[l, j])