-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathStreamStore.js
131 lines (114 loc) · 3.54 KB
/
StreamStore.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import N3Parser from '@rdfjs/parser-n3'
import toNT from '@rdfjs/to-ntriples'
import TripleToQuadTransform from 'rdf-transform-triple-to-quad'
import { Transform } from 'readable-stream'
import asyncToReadabe from './lib/asyncToReadabe.js'
import checkResponse from './lib/checkResponse.js'
import mergeHeaders from './lib/mergeHeaders.js'
/**
* A store implementation that parses and serializes SPARQL Graph Store responses and requests into/from Readable
* streams.
*/
class StreamStore {
/**
* @param {Object} options
* @param {SimpleClient} options.client client that provides the HTTP I/O
*/
constructor ({ client }) {
this.client = client
}
/**
* Sends a GET request to the Graph Store
*
* @param {NamedNode} [graph] source graph
* @return {Promise<Readable>}
*/
get (graph) {
return this.read({ method: 'GET', graph })
}
/**
* Sends a POST request to the Graph Store
*
* @param {Readable} stream triples/quads to write
* @param {Object} [options]
* @param {Term} [options.graph] target graph
* @return {Promise<void>}
*/
async post (stream, { graph } = {}) {
return this.write({ graph, method: 'POST', stream })
}
/**
* Sends a PUT request to the Graph Store
*
* @param {Readable} stream triples/quads to write
* @param {Object} [options]
* @param {Term} [options.graph] target graph
* @return {Promise<void>}
*/
async put (stream, { graph } = {}) {
return this.write({ graph, method: 'PUT', stream })
}
/**
* Generic read request to the Graph Store
*
* @param {Object} [options]
* @param {Term} [options.graph] source graph
* @param {string} options.method HTTP method
* @returns {Readable}
*/
read ({ graph, method }) {
return asyncToReadabe(async () => {
const url = new URL(this.client.storeUrl)
if (graph && graph.termType !== 'DefaultGraph') {
url.searchParams.append('graph', graph.value)
} else {
url.searchParams.append('default', '')
}
const res = await this.client.fetch(url, {
method,
headers: mergeHeaders(this.client.headers, { accept: 'application/n-triples' })
})
await checkResponse(res)
const parser = new N3Parser({ factory: this.client.factory })
const tripleToQuad = new TripleToQuadTransform(graph, { factory: this.client.factory })
return parser.import(res.body).pipe(tripleToQuad)
})
}
/**
* Generic write request to the Graph Store
*
* @param {Object} [options]
* @param {Term} [graph] target graph
* @param {string} method HTTP method
* @param {Readable} stream triples/quads to write
* @returns {Promise<void>}
*/
async write ({ graph, method, stream }) {
const url = new URL(this.client.storeUrl)
if (graph && graph.termType !== 'DefaultGraph') {
url.searchParams.append('graph', graph.value)
} else {
url.searchParams.append('default', '')
}
const serialize = new Transform({
writableObjectMode: true,
transform (quad, encoding, callback) {
const triple = {
subject: quad.subject,
predicate: quad.predicate,
object: quad.object,
graph: { termType: 'DefaultGraph' }
}
callback(null, `${toNT(triple)}\n`)
}
})
const res = await this.client.fetch(url, {
method,
headers: mergeHeaders(this.client.headers, { 'content-type': 'application/n-triples' }),
body: stream.pipe(serialize),
duplex: 'half'
})
await checkResponse(res)
}
}
export default StreamStore