Skip to content
This repository was archived by the owner on Feb 21, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Usage of ccql:
-Q string
Query/queries input file
-d string
Default schema to use (default "information_schema")
Schemas to use (default "information_schema")
-h string
Comma or space delimited list of hosts in hostname[:port] format. If not given, hosts read from stdin
-m uint
Expand Down
11 changes: 8 additions & 3 deletions go/cmd/ccql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

golib_log "github.com/outbrain/golib/log"
"gopkg.in/gcfg.v1"
"strings"
)

var AppVersion string
Expand All @@ -39,13 +40,15 @@ func main() {
password := flag.String("p", "", "MySQL password")
askPassword := flag.Bool("ask-pass", false, "prompt for MySQL password")
credentialsFile := flag.String("C", "", "Credentials file, expecting [client] scope, with 'user', 'password' fields. Overrides -u and -p")
defaultSchema := flag.String("d", "information_schema", "Default schema to use")
databases := flag.String("d", "information_schema", "List of databases to query from.")
hostsList := flag.String("h", "", "Comma or space delimited list of hosts in hostname[:port] format. If not given, hosts read from stdin")
hostsFile := flag.String("H", "", "Hosts file, hostname[:port] comma or space or newline delimited format. If not given, hosts read from stdin")
queriesText := flag.String("q", "", "Query/queries to execute")
queriesFile := flag.String("Q", "", "Query/queries input file")
timeout := flag.Float64("t", 0, "Connect timeout seconds")
maxConcurrency := flag.Uint("m", 32, "Max concurrent connections")
viewSourceSchema := flag.Bool("v", false, "View the source schema of the results")

flag.Parse()

if AppVersion == "" {
Expand Down Expand Up @@ -109,15 +112,17 @@ func main() {
}

if *askPassword {
fmt.Print("Mysql password: ")
fmt.Println("Mysql password: ")
passwd, err := terminal.ReadPassword(int(syscall.Stdin))
if err != nil {
log.Fatalf("\nError while get password:", err)
}
*password = string(passwd)
}

if err := logic.QueryHosts(hosts, *user, *password, *defaultSchema, queries, *maxConcurrency, *timeout); err != nil {
schemas := strings.Split(*databases, ",")

if err := logic.QuerySchemas(hosts, *user, *password, schemas, queries, *maxConcurrency, *timeout, *viewSourceSchema); err != nil {
os.Exit(1)
}
}
35 changes: 23 additions & 12 deletions go/logic/ccql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,30 @@ package logic

import (
"fmt"
"github.com/outbrain/golib/sqlutils"
"log"
"strings"

"github.com/outbrain/golib/sqlutils"
"sync"
)

// queryHost connects to a given host, issues the given set of queries, and outputs the results
// line per row in tab delimited format
func queryHost(host string, user string, password string, defaultSchema string, queries []string, timeout float64) error {
mysqlURI := fmt.Sprintf("%s:%s@tcp(%s)/%s?timeout=%fs", user, password, host, defaultSchema, timeout)
func queryHost(host string, user string, password string, schema string, queries []string, timeout float64, viewSourceSchema bool) error {
mysqlURI := fmt.Sprintf("%s:%s@tcp(%s)/%s?timeout=%fs", user, password, host, schema, timeout)
db, _, err := sqlutils.GetDB(mysqlURI)
if err != nil {
return err
}

for _, query := range queries {
resultData, err := sqlutils.QueryResultData(db, query)
if err != nil {
return err
}
for _, row := range resultData {
output := []string{host}
if viewSourceSchema {
output = append(output, schema)
}
for _, rowCell := range row {
output = append(output, rowCell.String)
}
Expand All @@ -34,26 +36,35 @@ func queryHost(host string, user string, password string, defaultSchema string,
return nil
}

// QueryHosts will issue concurrent queries on given list of hosts
func QueryHosts(hosts []string, user string, password string, defaultSchema string, queries []string, maxConcurrency uint, timeout float64) (anyError error) {
func QuerySchemas(hosts []string, user string, password string, schemas []string, queries []string, maxConcurrency uint,
timeout float64, viewSourceSchema bool) (anyError error) {
concurrentHosts := make(chan bool, maxConcurrency)
completedHosts := make(chan bool)

var wg sync.WaitGroup
for _, host := range hosts {
go func(host string) {
wg.Add(len(schemas))
concurrentHosts <- true
if err := queryHost(host, user, password, defaultSchema, queries, timeout); err != nil {
anyError = err
log.Printf("%s %s", host, err.Error())
// For each host, run all queries for the respective schema
for _, schema := range schemas {
go func(schema string) {
defer wg.Done()
if err := queryHost(host, user, password, schema, queries, timeout, viewSourceSchema); err != nil {
anyError = err
log.Printf("%s %s", host, err.Error())
}
}(schema)
}
wg.Wait()
<-concurrentHosts

completedHosts <- true
}(host)
}

// Barrier. Wait for all to complete
for range hosts {
<-completedHosts
}

return anyError
}