Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions backend/core/plugin/api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,24 @@ import (
"github.com/apache/incubator-devlake/core/errors"
)

// ApiAsyncCallback FIXME ...
type ApiAsyncCallback func(*http.Response) errors.Error

// ApiClientBeforeRequest FIXME ...
type ApiClientBeforeRequest func(req *http.Request) errors.Error

// ApiClientAfterResponse FIXME ...
type ApiClientAfterResponse func(res *http.Response) errors.Error

// ApiClientAbstract defines the functionalities needed by all plugins for Synchronized API Request
type ApiClient interface {
SetData(name string, data interface{})
GetData(name string) interface{}
SetHeaders(headers map[string]string)
SetBeforeFunction(callback ApiClientBeforeRequest)
GetBeforeFunction() ApiClientBeforeRequest
SetAfterFunction(callback ApiClientAfterResponse)
GetAfterFunction() ApiClientAfterResponse
Get(path string, query url.Values, headers http.Header) (*http.Response, errors.Error)
Post(path string, query url.Values, body interface{}, headers http.Header) (*http.Response, errors.Error)
}
15 changes: 7 additions & 8 deletions backend/helpers/pluginhelper/api/api_async_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/apache/incubator-devlake/core/log"
plugin "github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/core/utils"
"github.com/apache/incubator-devlake/helpers/pluginhelper/common"
)

// HttpMinStatusRetryCode is which status will retry
Expand Down Expand Up @@ -152,7 +151,7 @@ func (apiClient *ApiAsyncClient) DoAsync(
query url.Values,
body interface{},
header http.Header,
handler common.ApiAsyncCallback,
handler plugin.ApiAsyncCallback,
retry int,
) {
var request func() errors.Error
Expand Down Expand Up @@ -224,7 +223,7 @@ func (apiClient *ApiAsyncClient) DoGetAsync(
path string,
query url.Values,
header http.Header,
handler common.ApiAsyncCallback,
handler plugin.ApiAsyncCallback,
) {
apiClient.DoAsync(http.MethodGet, path, query, nil, header, handler, 0)
}
Expand All @@ -235,7 +234,7 @@ func (apiClient *ApiAsyncClient) DoPostAsync(
query url.Values,
body interface{},
header http.Header,
handler common.ApiAsyncCallback,
handler plugin.ApiAsyncCallback,
) {
apiClient.DoAsync(http.MethodPost, path, query, body, header, handler, 0)
}
Expand All @@ -247,14 +246,14 @@ func (apiClient *ApiAsyncClient) GetNumOfWorkers() int {

// RateLimitedApiClient FIXME ...
type RateLimitedApiClient interface {
DoGetAsync(path string, query url.Values, header http.Header, handler common.ApiAsyncCallback)
DoPostAsync(path string, query url.Values, body interface{}, header http.Header, handler common.ApiAsyncCallback)
DoGetAsync(path string, query url.Values, header http.Header, handler plugin.ApiAsyncCallback)
DoPostAsync(path string, query url.Values, body interface{}, header http.Header, handler plugin.ApiAsyncCallback)
WaitAsync() errors.Error
HasError() bool
NextTick(task func() errors.Error)
GetNumOfWorkers() int
GetAfterFunction() common.ApiClientAfterResponse
SetAfterFunction(callback common.ApiClientAfterResponse)
GetAfterFunction() plugin.ApiClientAfterResponse
SetAfterFunction(callback plugin.ApiClientAfterResponse)
Reset(d time.Duration)
GetTickInterval() time.Duration
Release()
Expand Down
32 changes: 22 additions & 10 deletions backend/helpers/pluginhelper/api/api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/log"
"github.com/apache/incubator-devlake/core/utils"
"github.com/apache/incubator-devlake/helpers/pluginhelper/common"
)

// ErrIgnoreAndContinue is a error which should be ignored
Expand All @@ -57,8 +56,8 @@ type ApiClient struct {
data map[string]interface{}
data_mutex sync.Mutex

beforeRequest common.ApiClientBeforeRequest
afterResponse common.ApiClientAfterResponse
beforeRequest plugin.ApiClientBeforeRequest
afterResponse plugin.ApiClientAfterResponse
ctx gocontext.Context
logger log.Logger
}
Expand Down Expand Up @@ -92,6 +91,17 @@ func NewApiClientFromConnection(
})
}

apiClient.SetAfterFunction(func(res *http.Response) errors.Error {
if res.StatusCode >= 400 {
bytes, err := io.ReadAll(res.Body)
if err != nil {
return errors.BadInput.Wrap(err, fmt.Sprintf("request failed with status code %d", res.StatusCode))
}
return errors.BadInput.New(fmt.Sprintf("request failed with status code %d, body: %s", res.StatusCode, string(bytes)))
}
return nil
})

return apiClient, nil
}

Expand Down Expand Up @@ -224,23 +234,23 @@ func (apiClient *ApiClient) GetHeaders() map[string]string {
}

// GetBeforeFunction return beforeResponseFunction
func (apiClient *ApiClient) GetBeforeFunction() common.ApiClientBeforeRequest {
func (apiClient *ApiClient) GetBeforeFunction() plugin.ApiClientBeforeRequest {
return apiClient.beforeRequest
}

// SetBeforeFunction will set beforeResponseFunction
func (apiClient *ApiClient) SetBeforeFunction(callback common.ApiClientBeforeRequest) {
func (apiClient *ApiClient) SetBeforeFunction(callback plugin.ApiClientBeforeRequest) {
apiClient.beforeRequest = callback
}

// GetAfterFunction return afterResponseFunction
func (apiClient *ApiClient) GetAfterFunction() common.ApiClientAfterResponse {
func (apiClient *ApiClient) GetAfterFunction() plugin.ApiClientAfterResponse {
return apiClient.afterResponse
}

// SetAfterFunction will set afterResponseFunction
// don't call this function directly in collector, use Collector.AfterResponse instead.
func (apiClient *ApiClient) SetAfterFunction(callback common.ApiClientAfterResponse) {
func (apiClient *ApiClient) SetAfterFunction(callback plugin.ApiClientAfterResponse) {
apiClient.afterResponse = callback
}

Expand Down Expand Up @@ -327,14 +337,15 @@ func (apiClient *ApiClient) Do(
if apiClient.beforeRequest != nil {
err = apiClient.beforeRequest(req)
if err != nil {
return nil, errors.Default.Wrap(err, fmt.Sprintf("error running beforeRequest for %s", req.URL.String()))
apiClient.logError(err, "[api-client] beforeRequest returned error for %s", req.URL.String())
return nil, err
}
}
apiClient.logDebug("[api-client] %v %v", method, *uri)
res, err = errors.Convert01(apiClient.client.Do(req))
if err != nil {
apiClient.logError(err, "[api-client] failed to request %s with error", req.URL.String())
return nil, errors.Default.Wrap(err, fmt.Sprintf("error requesting %s", req.URL.String()))
return nil, err
}
// after receive
if apiClient.afterResponse != nil {
Expand All @@ -345,7 +356,8 @@ func (apiClient *ApiClient) Do(
}
if err != nil {
res.Body.Close()
return nil, errors.Default.Wrap(err, fmt.Sprintf("error running afterRequest for %s", req.URL.String()))
apiClient.logError(err, "[api-client] afterResponse returned error for %s", req.URL.String())
return nil, err
}
}
return res, nil
Expand Down
7 changes: 3 additions & 4 deletions backend/helpers/pluginhelper/api/api_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/helpers/pluginhelper/common"
)

var _ plugin.SubTask = (*ApiCollector)(nil)
Expand Down Expand Up @@ -87,7 +86,7 @@ type ApiCollectorArgs struct {
// NORMALLY, DO NOT SPECIFY THIS PARAMETER, unless you know what it means
Concurrency int
ResponseParser func(res *http.Response) ([]json.RawMessage, errors.Error)
AfterResponse common.ApiClientAfterResponse
AfterResponse plugin.ApiClientAfterResponse
RequestBody func(reqData *RequestData) map[string]interface{}
Method string
}
Expand Down Expand Up @@ -389,12 +388,12 @@ func (collector *ApiCollector) generateUrl(pager *Pager, input interface{}) (str
}

// GetAfterResponse return apiClient's afterResponseFunction
func (collector *ApiCollector) GetAfterResponse() common.ApiClientAfterResponse {
func (collector *ApiCollector) GetAfterResponse() plugin.ApiClientAfterResponse {
return collector.args.ApiClient.GetAfterFunction()
}

// SetAfterResponse set apiClient's afterResponseFunction
func (collector *ApiCollector) SetAfterResponse(f common.ApiClientAfterResponse) {
func (collector *ApiCollector) SetAfterResponse(f plugin.ApiClientAfterResponse) {
collector.args.ApiClient.SetAfterFunction(f)
}

Expand Down
4 changes: 2 additions & 2 deletions backend/helpers/pluginhelper/api/api_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"testing"

"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/helpers/pluginhelper/common"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/unithelper"
mockdal "github.com/apache/incubator-devlake/mocks/core/dal"
mockapi "github.com/apache/incubator-devlake/mocks/helpers/pluginhelper/api"
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestFetchPageUndetermined(t *testing.T) {
},
Body: io.NopCloser(bytes.NewBufferString(body)),
}
handler := args.Get(3).(common.ApiAsyncCallback)
handler := args.Get(3).(plugin.ApiAsyncCallback)
handler(res)
}).Twice()
mockApi.On("NextTick", mock.Anything).Run(func(args mock.Arguments) {
Expand Down
3 changes: 1 addition & 2 deletions backend/helpers/pluginhelper/api/api_collector_with_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/common"
)

// ApiCollectorStateManager save collector state in framework table
Expand Down Expand Up @@ -302,7 +301,7 @@ type FinalizableApiCollectorCommonArgs struct {
Header func(reqData *RequestData, createdAfter *time.Time) (http.Header, errors.Error) // optional, build header for the request
RequestBody func(reqData *RequestData) map[string]interface{} // optional, build request body for the request if the Method set to POST or PUT
MinTickInterval *time.Duration // optional, minimum interval between two requests, some endpoints might have a more conservative rate limit than others within the same instance, you can mitigate this by setting a higher MinTickInterval to override the connection level rate limit.
AfterResponse common.ApiClientAfterResponse // optional, hook to run after each response, would be called before the ResponseParser
AfterResponse plugin.ApiClientAfterResponse // optional, hook to run after each response, would be called before the ResponseParser
ResponseParser func(res *http.Response) ([]json.RawMessage, errors.Error) // required, parse the response body and return a list of entities
}

Expand Down
32 changes: 0 additions & 32 deletions backend/helpers/pluginhelper/common/hooks.go

This file was deleted.