@@ -156,22 +156,18 @@ Logging.prototype.createSink = function(name, config, callback) {
156156 return ;
157157 }
158158
159- var gaxOptions = extend ( {
160- timeout : 1000 // "Deadline Exceeded" errors without.
161- } , config . gaxOptions ) ;
162-
163- delete config . gaxOptions ;
164-
165159 var reqOpts = {
166160 parent : 'projects/' + this . projectId ,
167161 sink : extend ( { } , config , { name : name } )
168162 } ;
169163
164+ delete reqOpts . sink . gaxOptions ;
165+
170166 this . request ( {
171167 client : 'configServiceV2Client' ,
172168 method : 'createSink' ,
173169 reqOpts : reqOpts ,
174- gaxOpts : gaxOptions
170+ gaxOpts : config . gaxOptions
175171 } , function ( err , resp ) {
176172 if ( err ) {
177173 callback ( err , null , resp ) ;
@@ -297,6 +293,7 @@ Logging.prototype.getEntries = function(options, callback) {
297293 reqOpts . resourceNames . push ( 'projects/' + this . projectId ) ;
298294
299295 delete reqOpts . autoPaginate ;
296+ delete reqOpts . gaxOptions ;
300297
301298 var gaxOptions = extend ( {
302299 autoPaginate : options . autoPaginate
@@ -363,14 +360,15 @@ Logging.prototype.getEntriesStream = function(options) {
363360 next ( null , Entry . fromApiResponse_ ( entry ) ) ;
364361 } ) ;
365362
366- userStream . on ( 'reading' , function ( ) {
363+ userStream . once ( 'reading' , function ( ) {
367364 var reqOpts = extend ( {
368365 orderBy : 'timestamp desc'
369366 } , options ) ;
370367 reqOpts . resourceNames = arrify ( reqOpts . resourceNames ) ;
371368 reqOpts . resourceNames . push ( 'projects/' + self . projectId ) ;
372369
373370 delete reqOpts . autoPaginate ;
371+ delete reqOpts . gaxOptions ;
374372
375373 var gaxOptions = extend ( {
376374 autoPaginate : options . autoPaginate
@@ -429,6 +427,7 @@ Logging.prototype.getSinks = function(options, callback) {
429427 } ) ;
430428
431429 delete reqOpts . autoPaginate ;
430+ delete reqOpts . gaxOptions ;
432431
433432 var gaxOptions = extend ( {
434433 autoPaginate : options . autoPaginate
@@ -484,8 +483,9 @@ Logging.prototype.getSinks = function(options, callback) {
484483Logging . prototype . getSinksStream = function ( options ) {
485484 var self = this ;
486485
487- var requestStream ;
486+ options = options || { } ;
488487
488+ var requestStream ;
489489 var userStream = streamEvents ( pumpify . obj ( ) ) ;
490490
491491 userStream . abort = function ( ) {
@@ -496,7 +496,7 @@ Logging.prototype.getSinksStream = function(options) {
496496
497497 var toSinkStream = through . obj ( function ( sink , _ , next ) {
498498 var sinkInstance = self . sink ( sink . name ) ;
499- sink . metadata = sink ;
499+ sinkInstance . metadata = sink ;
500500 next ( null , sinkInstance ) ;
501501 } ) ;
502502
@@ -505,6 +505,8 @@ Logging.prototype.getSinksStream = function(options) {
505505 parent : 'projects/' + self . projectId
506506 } ) ;
507507
508+ delete reqOpts . gaxOptions ;
509+
508510 var gaxOptions = extend ( {
509511 autoPaginate : options . autoPaginate
510512 } , options . gaxOptions ) ;
@@ -566,10 +568,12 @@ Logging.prototype.sink = function(name) {
566568 */
567569Logging . prototype . request = function ( config , callback ) {
568570 var self = this ;
571+ var isStreamMode = ! callback ;
572+
569573 var gaxStream ;
570574 var stream ;
571575
572- if ( ! callback ) {
576+ if ( isStreamMode ) {
573577 stream = streamEvents ( through . obj ( ) ) ;
574578
575579 stream . abort = function ( ) {
@@ -578,43 +582,64 @@ Logging.prototype.request = function(config, callback) {
578582 }
579583 } ;
580584
581- stream . on ( 'reading' , makeRequest ) ;
585+ stream . once ( 'reading' , makeRequestStream ) ;
582586 } else {
583- makeRequest ( ) ;
587+ makeRequestCallback ( ) ;
584588 }
585589
586- function makeRequest ( ) {
590+ function prepareGaxRequest ( callback ) {
587591 self . auth . getProjectId ( function ( err , projectId ) {
588592 if ( err ) {
589- if ( callback ) {
590- callback ( err ) ;
591- } else {
592- stream . destroy ( err ) ;
593- }
593+ callback ( err ) ;
594594 return ;
595595 }
596596
597- var reqOpts = extend ( true , { } , config . reqOpts ) ;
598- reqOpts = common . util . replaceProjectIdToken ( reqOpts , projectId ) ;
597+ var gaxClient = self . api [ config . client ] ;
599598
600- if ( ! self . api [ config . client ] ) {
599+ if ( ! gaxClient ) {
601600 // Lazily instantiate client.
602- self . api [ config . client ] = v2 ( self . options ) [ config . client ] ( self . options ) ;
601+ gaxClient = v2 ( self . options ) [ config . client ] ( self . options ) ;
602+ self . api [ config . client ] = gaxClient ;
603603 }
604604
605- var client = self . api [ config . client ] ;
605+ var reqOpts = extend ( true , { } , config . reqOpts ) ;
606+ reqOpts = common . util . replaceProjectIdToken ( reqOpts , projectId ) ;
606607
607- var gaxRequest = client [ config . method ] ( reqOpts , config . gaxOpts , callback ) ;
608+ var requestFn = gaxClient [ config . method ] . bind (
609+ gaxClient ,
610+ reqOpts ,
611+ config . gaxOpts
612+ ) ;
608613
609- if ( ! callback ) {
610- gaxStream = gaxRequest ;
614+ callback ( null , requestFn ) ;
615+ } ) ;
616+ }
617+
618+ function makeRequestCallback ( ) {
619+ prepareGaxRequest ( function ( err , requestFn ) {
620+ if ( err ) {
621+ callback ( err ) ;
622+ return ;
623+ }
624+
625+ requestFn ( callback ) ;
626+ } ) ;
627+ }
611628
612- gaxStream
613- . on ( 'error' , function ( err ) {
614- stream . destroy ( err ) ;
615- } )
616- . pipe ( stream ) ;
629+ function makeRequestStream ( ) {
630+ prepareGaxRequest ( function ( err , requestFn ) {
631+ if ( err ) {
632+ stream . destroy ( err ) ;
633+ return ;
617634 }
635+
636+ gaxStream = requestFn ( ) ;
637+
638+ gaxStream
639+ . on ( 'error' , function ( err ) {
640+ stream . destroy ( err ) ;
641+ } )
642+ . pipe ( stream ) ;
618643 } ) ;
619644 }
620645
0 commit comments