googlePubsubR
This vignette will provide a walk through for the most common functions that will cover the majority of use cases.
In order to authenticate the client, the following environment variables will need to be set:
GCP_AUTH_FILE
: path to the .json file containing GCP
credentials for a service account enabled to interact with the Pub/Sub
API. You can also directly pass the path as a function argument.GCP_PROJECT
: your GCP project id.Upon setting the environment variables, it is just a matter of calling:
More authentication methods can be found in the
?pubsub_auth
documentation.
In their most basic form, topics can be created in the following way:
# Create a topic that retains messages for a minimum of 4 hours.
# We'll attach some cutom labels to make it easier to spot
topic <- topics_create(name = "vignette-topic", message_retention_duration = 14400)
# A topic object is returned
topic
# $labels
# $labels$type
# [1] "pkg_vignette"
#
#
# $name
# [1] "projects/<my-gcp-project>/topics/vignette-topic"
#
# $kmsKeyName
# NULL
#
# $satisfiesPzs
# NULL
#
# $messageStoragePolicy
# NULL
#
# $schemaSettings
# NULL
#
# $messageRetentionDuration
# [1] "14400s"
#
# attr(,"class")
# [1] "Topic" "list"
It is possible to interact with pre-existing topic objects. For
instance, one could retrieve a Topic
object from an
existing topic:
if(topics_exists("vignette-topic")) {
topic <- topics_get("vignette-topic")
}
In order to consume messages from a topic, a subscription will be needed:
# You can either pass a Topic object or a topic name
sub <- subscriptions_create(
name = "vignette-sub",
topic = topic,
# Messages will expire after 3 days of inactivity (no messages acked)
expiration_policy = ExpirationPolicy(86400),
# We'll retain unacked messages for 12 hours
msg_retention_duration = 43200,
# We'll retry message delivery with at least 1 second delay from the previous try
retry_policy = RetryPolicy(min_backoff = 1)
)
sub
# $deadLetterPolicy
# NULL
#
# $messageRetentionDuration
# [1] "82400s"
#
# $labels
# NULL
#
# $retryPolicy
# $retryPolicy$minimumBackoff
# [1] "1s"
#
# $retryPolicy$maximumBackoff
# [1] "600s"
#
#
# $pushConfig
# named list()
#
# $ackDeadlineSeconds
# [1] 10
#
# $expirationPolicy
# $expirationPolicy$ttl
# [1] "86400s"
#
#
# $filter
# NULL
#
# $detached
# NULL
#
# $retainAckedMessages
# NULL
#
# $topic
# [1] "projects/<my-gcp-project>/topics/vignette-topic"
#
# $name
# [1] "projects/<my-gcp-project>/subscriptions/vignette-sub"
#
# $enableMessageOrdering
# NULL
#
# $topicMessageRetentionDuration
# [1] "14400s"
#
# attr(,"class")
# [1] "Subscription" "list"
We can also inspect all subscriptions attached to a given topic.
topics_list_subscriptions(topic = "vignette-topic")
Schemas can be used to enforce a specific format on incoming messages, this will force all malformed messages to be discarded or sent to a dead letter queue. A schema object can be passed to a topic upon creation. Getting the schema definition in the right format (Pub/Sub expects it as a string) can be quite fiddly. In this example we’ll define an AVRO schema.
avro_schema <- schemas_create(
name = "vignette-schema",
type = "AVRO",
# toJSON as the API expects a string containing the definition of the AVRO schema
definition = toJSON(list(
name = "cutlery",
type = "record",
fields = list(
list("name" = "name", "type" = "string"),
list("name" = "price", "type" = "int")
)
), auto_unbox = T)
)
# Test a message against the schema
msg <- list(
name = "John",
price = 123
) %>%
toJSON(auto_unbox = T) %>%
charToRaw() %>%
base64enc::base64encode() %>%
PubsubMessage()
schemas_validate_message(schema = "vignette-schema", message = msg, encoding = "JSON")
# [1] TRUE
# Create a new topic and attach the schema
topic <- topics_create(
name = "vignette-topic",
message_retention_duration = 14400,
schema_settings = SchemaSettings(encoding = "JSON", "vignette-schema")
)
seek
There are two ways to seek
a subscription back in
time:
# Create a snapshot of a subscription
snapshot <- snapshots_create(name = "vignette-snap", subscription = "vignette-sub")
# 'Rewind' the subscription to the snapshot we've just created
subscriptions_seek(subscription = "vignette-sub", snapshot = snapshot)
# [1] TRUE
# Seek the subscription to a specific timestamp
subscriptions_seek("vignette-sub", time = "2021-11-08T23:55:00Z")
Pubsub messages are expected encoded as base64 strings, depending on the object you’re dealing with, the process to convert them in the right format might vary. Below we’ll convert a dataframe in a format that will not upset Pubsub that much:
<- data.frame(name = "fork", price = 999) %>%
msg as.list() %>%
toJSON(auto_unbox = TRUE) %>%
charToRaw() %>%
::base64encode() %>%
base64encPubsubMessage()
topics_publish(messages = msg, topic = 'vignette-topic'))
Now that we have successfully published a message (conforming to the
schema specified above) we can pull messages from the subscription. Note
that pulling will not take messages out of the queue (you can do it as
many times you want). Messages will need to be acknowledged with
subscriptions_ack
after they have been successfully
consumed in order to be successfully taken out of the queue.
Pulling messages will return a list containing a
receivedMessages
dataframe containing ackIds and a
dataframe storing and message data and information.
<- subscriptions_pull("vignette-sub")
msgs
::glimpse(msgs)
tibble1
List of $ receivedMessages:'data.frame': 1 obs. of 2 variables:
$ ackId : chr "PkVTRFAGFixdRkhRNxkIaFEOT14jPzUgKEUSC1MTUVx1A1MQaVwzdQdRDRlzejV1aQwRVAsUUHRfURsfWVxEjczJsS9QXWJxa1oQAwJHUH1YUxw"| __truncated__
..$ message:'data.frame': 1 obs. of 4 variables:
..$ data : chr "eyJuYW1lIjoiZm9yayIsInByaWNlIjo5OTl9"
.. ..$ attributes :'data.frame': 1 obs. of 2 variables:
.. ..$ messageId : chr "3410320233787713"
.. ..$ publishTime: chr "2021-11-09T09:06:31.171Z"
.. ..
::glimpse(msgs$receivedMessages$message)
tibble: 1
Rows: 4
Columns$ data <chr> "eyJuYW1lIjoiZm9yayIsInByaWNlIjo5OTl9"
$ attributes <df[,2]> <data.frame[1 x 2]>
$ messageId <chr> "3410320233787713"
$ publishTime <chr> "2021-11-09T09:06:31.171Z
In order to re-convert back message data into an usable format we’ll
basically need to reverse the process that was used to encode them in
the first place. Given that subscriptions_pull
will return
all messages in the queue, a good strategy might be to
decode them all at once using lapply
decoded_msg <- lapply(msgs$receivedMessages$message$data, function(msg) {
msg %>%
base64decode() %>%
rawToChar() %>%
fromJSON(flatten = TRUE, simplifyDataFrame = TRUE) %>%
as.data.frame()
}) %>% do.call(rbind, .)
decoded_msg
# name price
# 1 fork 999
# ... Do something with the dataframe
# Acknowledge we have succesfully used it
subscriptions_ack(ack_ids = msgs$receivedMessages$ackId, subscription = "vignette-sub")
# [1] TRUE
This approach is quite cumbersome and helpers to facilitate this process might be included in the library later on when a general enough approach will be found.