-
Notifications
You must be signed in to change notification settings - Fork 19
/
writer.go
131 lines (107 loc) · 3.63 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package wire
import (
"context"
"errors"
"github.com/jeroenrinzema/psql-wire/pkg/buffer"
"github.com/jeroenrinzema/psql-wire/pkg/types"
)
// DataWriter represents a writer interface for writing columns and data rows
// using the Postgres wire to the connected client.
type DataWriter interface {
// Row writes a single data row containing the values inside the given slice to
// the underlaying Postgres client. The column headers have to be written before
// sending rows. Each item inside the slice represents a single column value.
// The slice length needs to be the same length as the defined columns. Nil
// values are encoded as NULL values.
Row([]any) error
// Written returns the number of rows written to the client.
Written() uint64
// Empty announces to the client an empty response and that no data rows should
// be expected.
Empty() error
// Complete announces to the client that the command has been completed and
// no further data should be expected.
//
// See [CommandComplete] for the expected format for different queries.
//
// [CommandComplete]: https://www.postgresql.org/docs/current/protocol-message-formats.html#PROTOCOL-MESSAGE-FORMATS-COMMANDCOMPLETE
Complete(description string) error
}
// ErrDataWritten is returned when an empty result is attempted to be sent to the
// client while data has already been written.
var ErrDataWritten = errors.New("data has already been written")
// ErrClosedWriter is returned when the data writer has been closed.
var ErrClosedWriter = errors.New("closed writer")
// NewDataWriter constructs a new data writer using the given context and
// buffer. The returned writer should be handled with caution as it is not safe
// for concurrent use. Concurrent access to the same data without proper
// synchronization can result in unexpected behavior and data corruption.
func NewDataWriter(ctx context.Context, columns Columns, formats []FormatCode, writer *buffer.Writer) DataWriter {
return &dataWriter{
ctx: ctx,
columns: columns,
formats: formats,
client: writer,
}
}
// dataWriter is a implementation of the DataWriter interface.
type dataWriter struct {
ctx context.Context
columns Columns
formats []FormatCode
client *buffer.Writer
closed bool
written uint64
}
func (writer *dataWriter) Define(columns Columns) error {
if writer.closed {
return ErrClosedWriter
}
writer.columns = columns
return writer.columns.Define(writer.ctx, writer.client, writer.formats)
}
func (writer *dataWriter) Row(values []any) error {
if writer.closed {
return ErrClosedWriter
}
writer.written++
return writer.columns.Write(writer.ctx, writer.formats, writer.client, values)
}
func (writer *dataWriter) Empty() error {
if writer.closed {
return ErrClosedWriter
}
if writer.written != 0 {
return ErrDataWritten
}
defer writer.close()
return nil
}
func (writer *dataWriter) Written() uint64 {
return writer.written
}
func (writer *dataWriter) Complete(description string) error {
if writer.closed {
return ErrClosedWriter
}
if writer.written == 0 && writer.columns != nil {
err := writer.Empty()
if err != nil {
return err
}
}
defer writer.close()
return commandComplete(writer.client, description)
}
func (writer *dataWriter) close() {
writer.closed = true
}
// commandComplete announces that the requested command has successfully been executed.
// The given description is written back to the client and could be used to send
// additional meta data to the user.
func commandComplete(writer *buffer.Writer, description string) error {
writer.Start(types.ServerCommandComplete)
writer.AddString(description)
writer.AddNullTerminate()
return writer.End()
}