Skip to content

Commit b1e4a88

Browse files
authored
Merge pull request #52 from GiGainfosystems/multiconnection_work
Multiconnection fixes
2 parents 1607444 + ae7d97f commit b1e4a88

File tree

8 files changed

+227
-104
lines changed

8 files changed

+227
-104
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,4 @@ dynamic-schema = ["diesel-dynamic-schema"]
5454
gst = []
5555

5656
[patch.crates-io]
57-
diesel = { git = "https://github.com/weiznich/diesel", rev = "e632a7ca4fa12b76d7638392aeaff7522f57adef" }
57+
diesel = { git = "https://github.com/weiznich/diesel", rev = "548e0d73a0f2c207f1abf03f8f6741f5a9b1cb0b" }

src/oracle/connection/bind_collector.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,16 @@ impl<'a> BindCollector<'a, Oracle> for OracleBindCollector<'a> {
7070

7171
Ok(())
7272
}
73+
74+
fn push_null_value(
75+
&mut self,
76+
metadata: <Oracle as diesel::sql_types::TypeMetadata>::TypeMetadata,
77+
) -> diesel::prelude::QueryResult<()> {
78+
let len = self.binds.len();
79+
self.binds
80+
.push((format!("in{}", len), BindValue::NotSet(metadata.tpe)));
81+
Ok(())
82+
}
7383
}
7484

7585
impl<'a, T> From<T> for BindValue<'a>

src/oracle/connection/mod.rs

Lines changed: 180 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use super::backend::Oracle;
99
use super::query_builder::OciQueryBuilder;
1010
use super::OciDataType;
1111
use crate::oracle::connection::stmt_iter::RowIter;
12+
use diesel::connection::Instrumentation;
13+
use diesel::connection::InstrumentationEvent;
1214
use diesel::connection::{Connection, SimpleConnection, TransactionManager};
1315
use diesel::connection::{LoadConnection, MultiConnectionHelper};
1416
use diesel::deserialize::FromSql;
@@ -146,6 +148,7 @@ mod transaction;
146148
pub struct OciConnection {
147149
raw: oracle::Connection,
148150
transaction_manager: OCITransactionManager,
151+
instrumentation: Option<Box<dyn Instrumentation>>,
149152
}
150153

151154
struct ErrorHelper(oracle::Error);
@@ -238,8 +241,23 @@ unsafe impl Send for OciConnection {}
238241

239242
impl SimpleConnection for OciConnection {
240243
fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
241-
self.raw.execute(query, &[]).map_err(ErrorHelper::from)?;
242-
Ok(())
244+
self.instrumentation
245+
.on_connection_event(InstrumentationEvent::start_query(
246+
&diesel::connection::StrQueryHelper::new(query),
247+
));
248+
let r = self
249+
.raw
250+
.execute(query, &[])
251+
.map_err(ErrorHelper::from)
252+
.map_err(Into::into)
253+
.map(|_| ());
254+
self.instrumentation
255+
.on_connection_event(InstrumentationEvent::finish_query(
256+
&diesel::connection::StrQueryHelper::new(query),
257+
r.as_ref().err(),
258+
));
259+
260+
r
243261
}
244262
}
245263

@@ -251,54 +269,20 @@ impl Connection for OciConnection {
251269
/// should be a valid connection string for a given backend. See the
252270
/// documentation for the specific backend for specifics.
253271
fn establish(database_url: &str) -> ConnectionResult<Self> {
254-
let url = url::Url::parse(database_url)
255-
.map_err(|_| ConnectionError::InvalidConnectionUrl("Invalid url".into()))?;
256-
if url.scheme() != "oracle" {
257-
return Err(ConnectionError::InvalidConnectionUrl(format!(
258-
"Got a unsupported url scheme: {}",
259-
url.scheme()
260-
)));
261-
}
262-
let user = url.username();
263-
264-
if user.is_empty() {
265-
return Err(ConnectionError::InvalidConnectionUrl(
266-
"Username not set".into(),
267-
));
268-
}
269-
let user = match percent_encoding::percent_decode_str(url.username()).decode_utf8() {
270-
Ok(username) => username,
271-
Err(_e) => {
272-
return Err(ConnectionError::InvalidConnectionUrl(
273-
"Username could not be percent decoded".into(),
274-
))
275-
}
276-
};
277-
let password = url
278-
.password()
279-
.ok_or_else(|| ConnectionError::InvalidConnectionUrl("Password not set".into()))?;
280-
281-
let host = url
282-
.host_str()
283-
.ok_or_else(|| ConnectionError::InvalidConnectionUrl("Hostname not set".into()))?;
284-
let port = url.port();
285-
let path = url.path();
286-
287-
let mut url = host.to_owned();
288-
if let Some(port) = port {
289-
write!(url, ":{}", port).expect("Write to string does not fail");
290-
}
291-
url += path;
292-
293-
let mut raw = oracle::Connection::connect(user, password, url)
294-
.map_err(ErrorHelper::from)
295-
.map_err(|e| ConnectionError::CouldntSetupConfiguration(e.into()))?;
296-
297-
raw.set_autocommit(true);
272+
let mut instrumentation = diesel::connection::get_default_instrumentation();
273+
instrumentation.on_connection_event(InstrumentationEvent::start_establish_connection(
274+
database_url,
275+
));
276+
let raw = Self::inner_establish(database_url);
277+
instrumentation.on_connection_event(InstrumentationEvent::finish_establish_connection(
278+
database_url,
279+
raw.as_ref().err(),
280+
));
298281

299282
Ok(Self {
300-
raw,
283+
raw: raw?,
301284
transaction_manager: OCITransactionManager::new(),
285+
instrumentation,
302286
})
303287
}
304288

@@ -307,35 +291,17 @@ impl Connection for OciConnection {
307291
where
308292
T: QueryFragment<Self::Backend> + QueryId,
309293
{
310-
let mut qb = OciQueryBuilder::default();
311-
312-
source.to_sql(&mut qb, &Oracle)?;
313-
314-
let conn = &self.raw;
315-
let sql = qb.finish();
316-
let mut stmt = conn.statement(&sql);
317-
if !source.is_safe_to_cache_prepared(&Oracle)? {
318-
stmt.exclude_from_cache();
319-
}
320-
let mut stmt = stmt.build().map_err(ErrorHelper::from)?;
321-
let mut bind_collector = OracleBindCollector::default();
322-
323-
source.collect_binds(&mut bind_collector, &mut (), &Oracle)?;
324-
let binds = bind_collector
325-
.binds
326-
.iter()
327-
.map(|(n, b)| -> (&str, &dyn oracle::sql_type::ToSql) {
328-
(n as &str, std::ops::Deref::deref(b))
329-
})
330-
.collect::<Vec<_>>();
331-
332-
if stmt.is_query() {
333-
stmt.query_named(&binds).map_err(ErrorHelper::from)?;
334-
} else {
335-
stmt.execute_named(&binds).map_err(ErrorHelper::from)?;
336-
}
337-
338-
Ok(stmt.row_count().map_err(ErrorHelper::from)? as usize)
294+
self.instrumentation
295+
.on_connection_event(InstrumentationEvent::start_query(&diesel::debug_query(
296+
source,
297+
)));
298+
let res = self.inner_executing_returning_count(source);
299+
self.instrumentation
300+
.on_connection_event(InstrumentationEvent::finish_query(
301+
&diesel::debug_query(source),
302+
res.as_ref().err(),
303+
));
304+
res
339305
}
340306

341307
fn transaction_state(
@@ -357,6 +323,14 @@ impl Connection for OciConnection {
357323
self.transaction_manager.is_test_transaction = true;
358324
Ok(())
359325
}
326+
327+
fn instrumentation(&mut self) -> &mut dyn diesel::connection::Instrumentation {
328+
&mut self.instrumentation
329+
}
330+
331+
fn set_instrumentation(&mut self, instrumentation: impl diesel::connection::Instrumentation) {
332+
self.instrumentation = Some(Box::new(instrumentation));
333+
}
360334
}
361335

362336
impl LoadConnection for OciConnection {
@@ -370,8 +344,11 @@ impl LoadConnection for OciConnection {
370344
Self::Backend: QueryMetadata<T::SqlType>,
371345
{
372346
let query = source.as_query();
373-
374-
self.with_prepared_statement(query, |mut stmt, bind_collector| {
347+
self.instrumentation
348+
.on_connection_event(InstrumentationEvent::start_query(&diesel::debug_query(
349+
&query,
350+
)));
351+
let res = self.with_prepared_statement(&query, |mut stmt, bind_collector| {
375352
if stmt.is_query() {
376353
let binds = bind_collector
377354
.binds
@@ -394,7 +371,13 @@ impl LoadConnection for OciConnection {
394371
} else {
395372
unreachable!()
396373
}
397-
})
374+
});
375+
self.instrumentation
376+
.on_connection_event(InstrumentationEvent::finish_query(
377+
&diesel::debug_query(&query),
378+
res.as_ref().err(),
379+
));
380+
res
398381
}
399382
}
400383

@@ -421,7 +404,7 @@ where
421404
impl OciConnection {
422405
fn with_prepared_statement<'conn, 'query, T, R>(
423406
&'conn mut self,
424-
query: T,
407+
query: &T,
425408
callback: impl FnOnce(oracle::Statement<'conn>, OracleBindCollector) -> QueryResult<R>,
426409
) -> Result<R, Error>
427410
where
@@ -632,36 +615,131 @@ impl OciConnection {
632615
});
633616

634617
if let Some(first_record) = record_iter.next() {
635-
let mut qb = OciQueryBuilder::default();
636-
first_record.to_sql(&mut qb, &Oracle)?;
637-
let query_string = qb.finish();
638-
let mut batch = self
639-
.raw
640-
.batch(&query_string, record_count)
641-
.build()
642-
.map_err(ErrorHelper::from)?;
618+
self.instrumentation
619+
.on_connection_event(InstrumentationEvent::start_query(&diesel::debug_query(
620+
&first_record,
621+
)));
622+
let res = self.inner_batch_insert(&first_record, record_count, record_iter);
623+
self.instrumentation
624+
.on_connection_event(InstrumentationEvent::finish_query(
625+
&diesel::debug_query(&first_record),
626+
res.as_ref().err(),
627+
));
628+
res
629+
} else {
630+
Ok(0)
631+
}
632+
}
633+
634+
fn inner_batch_insert<Q>(
635+
&mut self,
636+
first_record: &Q,
637+
record_count: usize,
638+
record_iter: impl Iterator<Item = Q>,
639+
) -> Result<usize, Error>
640+
where
641+
Q: QueryFragment<Oracle>,
642+
{
643+
let mut qb = OciQueryBuilder::default();
644+
first_record.to_sql(&mut qb, &Oracle)?;
645+
let query_string = qb.finish();
646+
let mut batch = self
647+
.raw
648+
.batch(&query_string, record_count)
649+
.build()
650+
.map_err(ErrorHelper::from)?;
651+
652+
bind_params_to_batch(first_record, &mut batch)?;
653+
for record in record_iter {
654+
bind_params_to_batch(&record, &mut batch)?;
655+
}
656+
batch.execute().map_err(ErrorHelper::from)?;
657+
Ok(record_count)
658+
}
643659

644-
bind_params_to_batch(first_record, &mut batch)?;
645-
for record in record_iter {
646-
bind_params_to_batch(record, &mut batch)?;
660+
fn inner_establish(database_url: &str) -> Result<oracle::Connection, ConnectionError> {
661+
let url = url::Url::parse(database_url)
662+
.map_err(|_| ConnectionError::InvalidConnectionUrl("Invalid url".into()))?;
663+
if url.scheme() != "oracle" {
664+
return Err(ConnectionError::InvalidConnectionUrl(format!(
665+
"Got a unsupported url scheme: {}",
666+
url.scheme()
667+
)));
668+
}
669+
let user = url.username();
670+
if user.is_empty() {
671+
return Err(ConnectionError::InvalidConnectionUrl(
672+
"Username not set".into(),
673+
));
674+
}
675+
let user = match percent_encoding::percent_decode_str(url.username()).decode_utf8() {
676+
Ok(username) => username,
677+
Err(_e) => {
678+
return Err(ConnectionError::InvalidConnectionUrl(
679+
"Username could not be percent decoded".into(),
680+
))
647681
}
648-
batch.execute().map_err(ErrorHelper::from)?;
649-
Ok(record_count)
682+
};
683+
let password = url
684+
.password()
685+
.ok_or_else(|| ConnectionError::InvalidConnectionUrl("Password not set".into()))?;
686+
let host = url
687+
.host_str()
688+
.ok_or_else(|| ConnectionError::InvalidConnectionUrl("Hostname not set".into()))?;
689+
let port = url.port();
690+
let path = url.path();
691+
let mut url = host.to_owned();
692+
if let Some(port) = port {
693+
write!(url, ":{}", port).expect("Write to string does not fail");
694+
}
695+
url += path;
696+
let mut raw = oracle::Connection::connect(user, password, url)
697+
.map_err(ErrorHelper::from)
698+
.map_err(|e| ConnectionError::CouldntSetupConfiguration(e.into()))?;
699+
raw.set_autocommit(true);
700+
Ok(raw)
701+
}
702+
703+
fn inner_executing_returning_count<T>(&mut self, source: &T) -> Result<usize, Error>
704+
where
705+
T: QueryFragment<Oracle> + QueryId,
706+
{
707+
let mut qb = OciQueryBuilder::default();
708+
709+
source.to_sql(&mut qb, &Oracle)?;
710+
711+
let conn = &self.raw;
712+
let sql = qb.finish();
713+
let mut stmt = conn.statement(&sql);
714+
if !source.is_safe_to_cache_prepared(&Oracle)? {
715+
stmt.exclude_from_cache();
716+
}
717+
let mut stmt = stmt.build().map_err(ErrorHelper::from)?;
718+
let mut bind_collector = OracleBindCollector::default();
719+
720+
source.collect_binds(&mut bind_collector, &mut (), &Oracle)?;
721+
let binds = bind_collector
722+
.binds
723+
.iter()
724+
.map(|(n, b)| -> (&str, &dyn oracle::sql_type::ToSql) {
725+
(n as &str, std::ops::Deref::deref(b))
726+
})
727+
.collect::<Vec<_>>();
728+
729+
if stmt.is_query() {
730+
stmt.query_named(&binds).map_err(ErrorHelper::from)?;
650731
} else {
651-
Ok(0)
732+
stmt.execute_named(&binds).map_err(ErrorHelper::from)?;
652733
}
734+
735+
Ok(stmt.row_count().map_err(ErrorHelper::from)? as usize)
653736
}
654737
}
655738

656-
fn bind_params_to_batch<'a, T, V, Op>(
657-
record: InsertStatement<T, &'a ValuesClause<V, T>, Op>,
739+
fn bind_params_to_batch(
740+
record: &impl QueryFragment<Oracle>,
658741
batch: &mut oracle::Batch,
659-
) -> Result<(), Error>
660-
where
661-
T: Table + 'a,
662-
V: 'a,
663-
InsertStatement<T, &'a ValuesClause<V, T>, Op>: QueryFragment<Oracle>,
664-
{
742+
) -> Result<(), Error> {
665743
let mut bind_collector = OracleBindCollector::default();
666744
record.collect_binds(&mut bind_collector, &mut (), &Oracle)?;
667745
let binds = bind_collector

0 commit comments

Comments
 (0)